67 votes

Python Multiprocessing.Pool itération paresseuse

Je m'interroge sur la façon dont la classe Multiprocessing.Pool de python fonctionne avec map, imap et map_async. Mon problème particulier est que je veux map sur un itérateur qui crée des objets lourds en mémoire, et que je ne veux pas que tous ces objets soient générés en mémoire en même temps. Je voulais voir si les différentes fonctions map() allaient tordre mon itérateur à sec, ou appeler intelligemment la fonction next() uniquement lorsque les processus enfants avancent lentement, j'ai donc bricolé quelques tests en ce sens :

def g():
  for el in xrange(100):
    print el
    yield el

def f(x):
  time.sleep(1)
  return x*x

if __name__ == '__main__':
  pool = Pool(processes=4)              # start 4 worker processes
  go = g()
  g2 = pool.imap(f, go)
  g2.next()

Et ainsi de suite avec map, imap, et map_async. Il s'agit cependant de l'exemple le plus flagrant, car le simple fait d'appeler next() une seule fois sur g2 imprime tous les éléments de mon générateur g(), alors que si imap faisait cela "paresseusement", je m'attendrais à ce qu'il n'appelle go.next() qu'une seule fois, et n'imprime donc que "1".

Quelqu'un peut-il clarifier ce qui se passe, et s'il existe un moyen de faire en sorte que le pool de processus évalue "paresseusement" l'itérateur selon les besoins ?

Merci,

Gabe

0 votes

Après avoir retiré le time.sleep et l'ajout d'un print os.getpid(), x sur f le comportement est encore plus bizarre, parfois seulement 2 ou 3 PID différents sont imprimés et font toujours un nombre différent d'itérations... BTW quelle version de Python utilisez-vous ?

0 votes

Python 2.6.6 (r266:84292, Dec 26 2010, 22:31:48) Installation standard debian.

39voto

unutbu Points 222216

Examinons d'abord la fin du programme.

Le module de multitraitement utilise atexit d'appeler multiprocessing.util._exit_function lorsque votre programme se termine.

Si vous retirez g2.next() votre programme se termine rapidement.

Le site _exit_function appelle éventuellement Pool._terminate_pool . Le thread principal change l'état de pool._task_handler._state de RUN a TERMINATE . Pendant ce temps, le pool._task_handler Le fil est en boucle dans Pool._handle_tasks et se retire lorsqu'il atteint la condition

            if thread._state:
                debug('task handler found thread._state != RUN')
                break

(Voir /usr/lib/python2.6/multiprocessing/pool.py)

C'est ce qui empêche le gestionnaire de tâches de consommer entièrement votre générateur, g() . Si vous regardez dans Pool._handle_tasks vous verrez

        for i, task in enumerate(taskseq):
            ...
            try:
                put(task)
            except IOError:
                debug('could not put task on queue')
                break

C'est le code qui consomme votre générateur. ( taskseq n'est pas exactement votre générateur, mais comme taskseq est consommé, votre générateur l'est aussi).

En revanche, lorsque vous appelez g2.next() le fil principal appelle IMapIterator.next et attend lorsqu'il atteint self._cond.wait(timeout) .

Que le fil principal attende au lieu de d'appeler _exit_function est ce qui permet au thread du gestionnaire de tâches de fonctionner normalement, c'est-à-dire de consommer pleinement le générateur au fur et à mesure qu'il put dans le cadre de la worker s' inqueue dans le Pool._handle_tasks fonction.

L'essentiel est que tous les Pool Les fonctions map consomment la totalité de l'itérable qui leur est donné. Si vous souhaitez consommer le générateur par morceaux, vous pouvez faire ceci à la place :

import multiprocessing as mp
import itertools
import time

def g():
    for el in xrange(50):
        print el
        yield el

def f(x):
    time.sleep(1)
    return x * x

if __name__ == '__main__':
    pool = mp.Pool(processes=4)              # start 4 worker processes
    go = g()
    result = []
    N = 11
    while True:
        g2 = pool.map(f, itertools.islice(go, N))
        if g2:
            result.extend(g2)
            time.sleep(1)
        else:
            break
    print(result)

3 votes

Excellente réponse, j'ai fini par réimplémenter un pool de threads qui consomme élément par élément en attendant, mais votre solution islice m'aurait demandé beaucoup moins de travail, oh bien :-). J'ai essayé de regarder un peu dans pool.py et j'ai remarqué qu'en effet les fonctions map/imap/map_async semblent consommer l'itérateur tout de suite. Je ne sais pas si c'est vraiment nécessaire, surtout dans le cas de la fonction standard Pool.map() ?

2 votes

@Gabe : Pour consommer l'itérateur juste à temps, je pense qu'un mécanisme de signalisation supplémentaire devrait être codé dans Pool pour indiquer au gestionnaire de tâches quand put plus de tâches dans le inqueue . Peut-être que c'est possible, mais que cela n'existe pas actuellement dans le domaine de la santé. Pool et pourrait aussi ralentir un peu le processus.

0 votes

En effet, ma solution était de créer une file d'attente de tâches de taille N*taille_du_pool et de jouer avec N jusqu'à ce que la file d'attente ait l'air de conserver un bon tampon. Bien sûr, cela dépend de la tâche et je peux comprendre que l'auteur du code Pool n'ait pas voulu s'en occuper. Merci pour votre réponse !

5voto

neo Points 586

Ce que vous voulez est mis en œuvre dans le NuMap à partir du site web :

NuMap est un système parallèle (basé sur des threads ou des processus, local ou distant), tamponné, multi-tâches, itertools.imap ou multiprocessing.Pool.imap remplacement de fonction. Comme imap, il évalue une fonction sur des éléments de une séquence ou un itérable, et il le fait paresseusement. La paresse peut être ajustée via les arguments "stride" et "buffer".

4voto

GrantJ Points 888

J'avais aussi ce problème et j'ai été déçu d'apprendre que la carte consomme tous ses éléments. J'ai codé une fonction qui consomme l'itérateur paresseusement en utilisant le type de données Queue en multitraitement. C'est similaire à ce que @unutbu décrit dans un commentaire de sa réponse mais, comme il le souligne, cela souffre de l'absence de mécanisme de rappel pour recharger la file d'attente. Le type de données Queue expose à la place un paramètre de timeout et j'ai utilisé 100 millisecondes avec un bon effet.

from multiprocessing import Process, Queue, cpu_count
from Queue import Full as QueueFull
from Queue import Empty as QueueEmpty

def worker(recvq, sendq):
    for func, args in iter(recvq.get, None):
        result = func(*args)
        sendq.put(result)

def pool_imap_unordered(function, iterable, procs=cpu_count()):
    # Create queues for sending/receiving items from iterable.

    sendq = Queue(procs)
    recvq = Queue()

    # Start worker processes.

    for rpt in xrange(procs):
        Process(target=worker, args=(sendq, recvq)).start()

    # Iterate iterable and communicate with worker processes.

    send_len = 0
    recv_len = 0
    itr = iter(iterable)

    try:
        value = itr.next()
        while True:
            try:
                sendq.put((function, value), True, 0.1)
                send_len += 1
                value = itr.next()
            except QueueFull:
                while True:
                    try:
                        result = recvq.get(False)
                        recv_len += 1
                        yield result
                    except QueueEmpty:
                        break
    except StopIteration:
        pass

    # Collect all remaining results.

    while recv_len < send_len:
        result = recvq.get()
        recv_len += 1
        yield result

    # Terminate worker processes.

    for rpt in xrange(procs):
        sendq.put(None)

Cette solution présente l'avantage de ne pas grouper les demandes à Pool.map. Un travailleur individuel ne peut pas empêcher les autres de progresser. YMMV. Notez que vous pouvez utiliser un objet différent pour signaler la fin des travailleurs. Dans l'exemple, j'ai utilisé None.

Testé sur "Python 2.7 (r27:82525, 4 Jul 2010, 09:01:59) [MSC v.1500 32 bit (Intel)] sur win32".

0 votes

J'ai vérifié sur Python 3.3 et ni l'un ni l'autre imap ni imap_unordered ne consomme pas tous les arguments avant de lancer la fonction mappée, bien que map fait.

0 votes

+1 C'est presque ce dont j'ai besoin, mais malheureusement j'ai besoin de résultats ordonnés.

0 votes

Au lieu de régler les délais d'attente des get/put pour les files d'attente d'entrée et de sortie, je fixe normalement 1) une taille fixe pour les deux files d'attente, et 2) je laisse les get/put bloquer si la file d'attente est vide/pleine. De cette façon, il n'y a pas besoin de régler les délais d'attente. Il est seulement nécessaire de vérifier le nombre d'éléments entrant dans la file d'attente et sortant de la file d'attente. L'ordre correct est donc le suivant 1) démarrer les travailleurs ; 2) démarrer le collecteur de sortie de la file d'attente ; 3) itérer sur l'entrée et remplir la file d'attente d'entrée.

1voto

Vitaly Fadeev Points 450

Dans cet exemple (voir le code, s'il vous plaît) 2 travailleurs.

Le pool fonctionne comme prévu : quand le travailleur est libre, il fait l'itération suivante.

Ce code est identique à celui de la rubrique, à une exception près : taille de l'argument = 64 k.

64 k - taille par défaut du tampon de la socket.

import itertools
from multiprocessing import Pool
from time import sleep

def f( x ):
    print( "f()" )
    sleep( 3 )
    return x

def get_reader():
    for x in range( 10 ):
        print( "readed: ", x )
        value = " " * 1024 * 64 # 64k
        yield value

if __name__ == '__main__':

    p = Pool( processes=2 )

    data = p.imap( f, get_reader() )

    p.close()
    p.join()

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X