9 votes

Python multiprocessing : comment limiter le nombre de processus en attente ?

Lors de l'exécution d'un grand nombre de tâches (avec des paramètres importants) à l'aide de Pool.apply_async, les processus sont alloués et passent dans un état d'attente, et il n'y a pas de limite au nombre de processus en attente. Cela peut finir par consommer toute la mémoire, comme dans l'exemple ci-dessous :

import multiprocessing
import numpy as np

def f(a,b):
    return np.linalg.solve(a,b)

def test():

    p = multiprocessing.Pool()
    for _ in range(1000):
        p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000)))
    p.close()
    p.join()

if __name__ == '__main__':
    test()

Je cherche un moyen de limiter la file d'attente, de manière à ce qu'il n'y ait qu'un nombre limité de processus en attente, et que Pool.apply_async soit bloqué tant que la file d'attente est pleine.

7voto

ecatmur Points 64173

multiprocessing.Pool a une _taskqueue de type multiprocessing.Queue qui prend en compte un élément optionnel maxsize Malheureusement, il le construit sans le paramètre maxsize le jeu de paramètres.

Je recommande de sous-classer multiprocessing.Pool avec un copier-coller de multiprocessing.Pool.__init__ qui passe maxsize a _taskqueue constructeur.

Monkey-Parcheando l'objet (soit le pool ou la file d'attente) fonctionnerait également, mais vous devriez monkeypatch pool._taskqueue._maxsize y pool._taskqueue._sem de sorte qu'il serait assez fragile :

pool._taskqueue._maxsize = maxsize
pool._taskqueue._sem = BoundedSemaphore(maxsize)

3voto

Roger Dahl Points 8326

Attendre si pool._taskqueue est supérieur à la taille souhaitée :

import multiprocessing
import time

import numpy as np

def f(a,b):
    return np.linalg.solve(a,b)

def test(max_apply_size=100):
    p = multiprocessing.Pool()
    for _ in range(1000):
        p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000)))

        while p._taskqueue.qsize() > max_apply_size:
            time.sleep(1)

    p.close()
    p.join()

if __name__ == '__main__':
    test()

1voto

nijave Points 419

Voici un singe Parcheando alternatif à la première réponse :

import queue
from multiprocessing.pool import ThreadPool as Pool

class PatchedQueue():
  """
  Wrap stdlib queue and return a Queue(maxsize=...)
  when queue.SimpleQueue is accessed
  """

  def __init__(self, simple_queue_max_size=5000):
    self.simple_max = simple_queue_max_size  

  def __getattr__(self, attr):
    if attr == "SimpleQueue":
      return lambda: queue.Queue(maxsize=self.simple_max)
    return getattr(queue, attr)

class BoundedPool(Pool):
  # Override queue in this scope to use the patcher above
  queue = PatchedQueue()

pool = BoundedPool()
pool.apply_async(print, ("something",))

Cela fonctionne comme prévu pour Python 3.8 où le pool multiprocessus utilise queue.SimpleQueue pour configurer la file d'attente des tâches. Il semble que l'implémentation de multiprocessing.Pool peut avoir changé depuis la version 2.7

0voto

J.F. Sebastian Points 102961

Vous pourriez ajouter une file d'attente explicite avec le paramètre maxsize et utiliser queue.put() au lieu de pool.apply_async() dans ce cas. Les processus de travail pourraient alors :

for a, b in iter(queue.get, sentinel):
    # process it

Si vous souhaitez limiter le nombre d'arguments/résultats d'entrée créés qui sont en mémoire à environ le nombre de processus de travail actif, vous pouvez utiliser pool.imap*() des méthodes :

#!/usr/bin/env python
import multiprocessing
import numpy as np

def f(a_b):
    return np.linalg.solve(*a_b)

def main():
    args = ((np.random.rand(1000,1000), np.random.rand(1000))
            for _ in range(1000))
    p = multiprocessing.Pool()
    for result in p.imap_unordered(f, args, chunksize=1):
        pass
    p.close()
    p.join()

if __name__ == '__main__':
    main()

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X