197 votes

Multiprocessing : Comment utiliser Pool.map sur une fonction définie dans une classe ?

Quand j'exécute quelque chose comme :

from multiprocessing import Pool

p = Pool(5)
def f(x):
     return x*x

p.map(f, [1,2,3])

il fonctionne bien. Cependant, en mettant cela comme une fonction d'une classe :

class calculate(object):
    def run(self):
        def f(x):
            return x*x

        p = Pool()
        return p.map(f, [1,2,3])

cl = calculate()
print cl.run()

Donne l'erreur suivante :

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/sw/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

J'ai vu un article d'Alex Martelli traitant du même genre de problème, mais il n'était pas assez explicite.

1 votes

"ceci comme une fonction d'une classe" ? Pouvez-vous afficher le code qui génère l'erreur réelle ? Sans le code réel, nous ne pouvons que deviner ce que vous faites mal.

0 votes

D'une manière générale, il existe des modules de décapage plus puissants que le module pickle standard de Python (comme le module picloud module mentionné dans cette réponse ).

1 votes

J'ai eu un problème similaire avec les fermetures dans IPython.Parallel mais là, on peut contourner le problème en poussant les objets vers les nœuds. Il semble assez ennuyeux de contourner ce problème avec le multiprocessing.

91voto

klaus se Points 363

Je n'ai pas pu utiliser les codes postés jusqu'à présent car les codes utilisant "multiprocessing.Pool" ne fonctionnent pas avec les expressions lambda et les codes n'utilisant pas "multiprocessing.Pool" génèrent autant de processus qu'il y a d'éléments de travail.

J'ai adapté le code pour qu'il génère un nombre prédéfini de travailleurs et qu'il n'itère dans la liste d'entrée que s'il existe un travailleur libre. J'ai également activé le mode "démon" pour les travailleurs et ctrl-c fonctionne comme prévu.

import multiprocessing

def fun(f, q_in, q_out):
    while True:
        i, x = q_in.get()
        if i is None:
            break
        q_out.put((i, f(x)))

def parmap(f, X, nprocs=multiprocessing.cpu_count()):
    q_in = multiprocessing.Queue(1)
    q_out = multiprocessing.Queue()

    proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
            for _ in range(nprocs)]
    for p in proc:
        p.daemon = True
        p.start()

    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [q_in.put((None, None)) for _ in range(nprocs)]
    res = [q_out.get() for _ in range(len(sent))]

    [p.join() for p in proc]

    return [x for i, x in sorted(res)]

if __name__ == '__main__':
    print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))

4 votes

Comment faire pour qu'une barre de progression fonctionne correctement avec ce parmap fonction ?

2 votes

Une question : j'ai utilisé cette solution mais j'ai remarqué que les processus python que j'ai lancés sont restés actifs en mémoire. Avez-vous une idée rapide sur la façon de les tuer lorsque votre parmap se termine ?

1 votes

@klaus-se Je sais que l'on nous décourage de simplement dire merci dans les commentaires, mais votre réponse a trop de valeur pour moi, je n'ai pas pu résister. J'aimerais pouvoir vous donner plus qu'une seule réputation...

74voto

mrule Points 342

J'étais également gêné par les restrictions sur le type de fonctions que pool.map pouvait accepter. J'ai écrit ce qui suit pour contourner ces restrictions. Cela semble fonctionner, même pour une utilisation récursive de parmap.

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(pipe, x):
        pipe.send(f(x))
        pipe.close()
    return fun

def parmap(f, X):
    pipe = [Pipe() for x in X]
    proc = [Process(target=spawn(f), args=(c, x)) for x, (p, c) in izip(X, pipe)]
    [p.start() for p in proc]
    [p.join() for p in proc]
    return [p.recv() for (p, c) in pipe]

if __name__ == '__main__':
    print parmap(lambda x: x**x, range(1, 5))

1 votes

Cela a très bien fonctionné pour moi, merci. J'ai trouvé une faiblesse : J'ai essayé d'utiliser parmap sur certaines fonctions qui passaient autour d'un defaultdict et j'ai obtenu le PicklingError à nouveau. Je n'ai pas trouvé de solution à ce problème, j'ai juste retravaillé mon code pour ne pas utiliser le defaultdict.

2 votes

Cela ne fonctionne pas dans Python 2.7.2 (par défaut, Jun 12 2011, 15:08:59) [MSC v.1500 32 bit (Intel)] sur win32

3 votes

Cela fonctionne sur Python 2.7.3 1 août 2012, 05:14:39. Cela ne fonctionne pas sur les itérables géants -> cela provoque une OSError : [Errno 24] Too many open files due to the number of pipes it opens.

42voto

EOL Points 24342

Il n'y a actuellement aucune solution à votre problème, pour autant que je sache : la fonction que vous donnez à map() doit être accessible par une importation de votre module. C'est pourquoi le code de Robert fonctionne : la fonction f() peut être obtenu en important le code suivant :

def f(x):
    return x*x

class Calculate(object):
    def run(self):
        p = Pool()
        return p.map(f, [1,2,3])

if __name__ == '__main__':
    cl = Calculate()
    print cl.run()

En fait, j'ai ajouté une section "principale", parce que celle-ci suit les recommandations pour la plate-forme Windows ("Assurez-vous que le module principal peut être importé en toute sécurité par un nouvel interpréteur Python sans provoquer d'effets secondaires involontaires").

J'ai aussi ajouté une lettre majuscule devant Calculate afin de suivre PEP 8 . :)

18voto

Bob McElrath Points 101

La solution proposée par mrule est correcte mais comporte un bogue : si l'enfant renvoie une grande quantité de données, il peut remplir le tampon du tuyau, bloquant ainsi l'activité de l'enfant. pipe.send() pendant que le parent attend que l'enfant quitte le site. pipe.join() . La solution est de lire les données de l'enfant avant de join() l'enfant. De plus, l'enfant doit fermer l'extrémité du tuyau du parent pour éviter un blocage. Le code ci-dessous corrige cela. Soyez également conscient que ce parmap crée un processus par élément dans X . Une solution plus avancée consiste à utiliser multiprocessing.cpu_count() de diviser X en un certain nombre de morceaux, puis fusionner les résultats avant de revenir. Je laisse cet exercice au lecteur pour ne pas gâcher la concision de la belle réponse de mrule. ;)

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(ppipe, cpipe,x):
        ppipe.close()
        cpipe.send(f(x))
        cpipe.close()
    return fun

def parmap(f,X):
    pipe=[Pipe() for x in X]
    proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)]
    [p.start() for p in proc]
    ret = [p.recv() for (p,c) in pipe]
    [p.join() for p in proc]
    return ret

if __name__ == '__main__':
    print parmap(lambda x:x**x,range(1,5))

0 votes

Comment choisir le nombre de processus ?

0 votes

Cependant, il meurt assez rapidement à cause de l'erreur OSError: [Errno 24] Too many open files . Je pense qu'il doit y avoir une sorte de limite sur le nombre de processus pour que cela fonctionne correctement...

15voto

user753720 Points 88

J'ai aussi lutté avec ça. J'avais des fonctions comme membres de données d'une classe, comme exemple simplifié :

from multiprocessing import Pool
import itertools
pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # Needed to do something like this (the following line won't work)
        return pool.map(self.f,list1,list2)  

J'avais besoin d'utiliser la fonction self.f dans un appel à Pool.map() à partir de la même classe et self.f ne prenait pas un tuple comme argument. Comme cette fonction était intégrée dans une classe, je ne voyais pas comment écrire le type de wrapper suggéré par les autres réponses.

J'ai résolu ce problème en utilisant un wrapper différent qui prend un tuple/liste, où le premier élément est la fonction, et les autres éléments sont les arguments de cette fonction, appelé eval_func_tuple(f_args). En utilisant ceci, la ligne problématique peut être remplacée par return pool.map(eval_func_tuple, itertools.izip(itertools.repeat(self.f), list1, list2)). Voici le code complet :

Fichier : util.py

def add(a, b): return a+b

def eval_func_tuple(f_args):
    """Takes a tuple of a function and args, evaluates and returns result"""
    return f_args[0](*f_args[1:])  

Fichier : main.py

from multiprocessing import Pool
import itertools
import util  

pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # The following line will now work
        return pool.map(util.eval_func_tuple, 
            itertools.izip(itertools.repeat(self.f), list1, list2)) 

if __name__ == '__main__':
    myExample = Example(util.add)
    list1 = [1, 2, 3]
    list2 = [10, 20, 30]
    print myExample.add_lists(list1, list2)  

L'exécution de main.py donnera [11, 22, 33]. N'hésitez pas à améliorer ceci, par exemple eval_func_tuple pourrait aussi être modifié pour prendre des arguments de mots-clés.

Par ailleurs, dans une autre réponse, la fonction "parmap" peut être rendue plus efficace dans le cas où le nombre de processus est supérieur au nombre de CPU disponibles. Je copie une version éditée ci-dessous. C'est mon premier message et je ne savais pas si je devais modifier directement la réponse originale. J'ai également renommé certaines variables.

from multiprocessing import Process, Pipe  
from itertools import izip  

def spawn(f):  
    def fun(pipe,x):  
        pipe.send(f(x))  
        pipe.close()  
    return fun  

def parmap(f,X):  
    pipe=[Pipe() for x in X]  
    processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]  
    numProcesses = len(processes)  
    processNum = 0  
    outputList = []  
    while processNum < numProcesses:  
        endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses)  
        for proc in processes[processNum:endProcessNum]:  
            proc.start()  
        for proc in processes[processNum:endProcessNum]:  
            proc.join()  
        for proc,c in pipe[processNum:endProcessNum]:  
            outputList.append(proc.recv())  
        processNum = endProcessNum  
    return outputList    

if __name__ == '__main__':  
    print parmap(lambda x:x**x,range(1,5))

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X