J'ai aussi lutté avec ça. J'avais des fonctions comme membres de données d'une classe, comme exemple simplifié :
from multiprocessing import Pool
import itertools
pool = Pool()
class Example(object):
def __init__(self, my_add):
self.f = my_add
def add_lists(self, list1, list2):
# Needed to do something like this (the following line won't work)
return pool.map(self.f,list1,list2)
J'avais besoin d'utiliser la fonction self.f dans un appel à Pool.map() à partir de la même classe et self.f ne prenait pas un tuple comme argument. Comme cette fonction était intégrée dans une classe, je ne voyais pas comment écrire le type de wrapper suggéré par les autres réponses.
J'ai résolu ce problème en utilisant un wrapper différent qui prend un tuple/liste, où le premier élément est la fonction, et les autres éléments sont les arguments de cette fonction, appelé eval_func_tuple(f_args). En utilisant ceci, la ligne problématique peut être remplacée par return pool.map(eval_func_tuple, itertools.izip(itertools.repeat(self.f), list1, list2)). Voici le code complet :
Fichier : util.py
def add(a, b): return a+b
def eval_func_tuple(f_args):
"""Takes a tuple of a function and args, evaluates and returns result"""
return f_args[0](*f_args[1:])
Fichier : main.py
from multiprocessing import Pool
import itertools
import util
pool = Pool()
class Example(object):
def __init__(self, my_add):
self.f = my_add
def add_lists(self, list1, list2):
# The following line will now work
return pool.map(util.eval_func_tuple,
itertools.izip(itertools.repeat(self.f), list1, list2))
if __name__ == '__main__':
myExample = Example(util.add)
list1 = [1, 2, 3]
list2 = [10, 20, 30]
print myExample.add_lists(list1, list2)
L'exécution de main.py donnera [11, 22, 33]. N'hésitez pas à améliorer ceci, par exemple eval_func_tuple pourrait aussi être modifié pour prendre des arguments de mots-clés.
Par ailleurs, dans une autre réponse, la fonction "parmap" peut être rendue plus efficace dans le cas où le nombre de processus est supérieur au nombre de CPU disponibles. Je copie une version éditée ci-dessous. C'est mon premier message et je ne savais pas si je devais modifier directement la réponse originale. J'ai également renommé certaines variables.
from multiprocessing import Process, Pipe
from itertools import izip
def spawn(f):
def fun(pipe,x):
pipe.send(f(x))
pipe.close()
return fun
def parmap(f,X):
pipe=[Pipe() for x in X]
processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]
numProcesses = len(processes)
processNum = 0
outputList = []
while processNum < numProcesses:
endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses)
for proc in processes[processNum:endProcessNum]:
proc.start()
for proc in processes[processNum:endProcessNum]:
proc.join()
for proc,c in pipe[processNum:endProcessNum]:
outputList.append(proc.recv())
processNum = endProcessNum
return outputList
if __name__ == '__main__':
print parmap(lambda x:x**x,range(1,5))
1 votes
"ceci comme une fonction d'une classe" ? Pouvez-vous afficher le code qui génère l'erreur réelle ? Sans le code réel, nous ne pouvons que deviner ce que vous faites mal.
0 votes
D'une manière générale, il existe des modules de décapage plus puissants que le module pickle standard de Python (comme le module picloud module mentionné dans cette réponse ).
1 votes
J'ai eu un problème similaire avec les fermetures dans
IPython.Parallel
mais là, on peut contourner le problème en poussant les objets vers les nœuds. Il semble assez ennuyeux de contourner ce problème avec le multiprocessing.0 votes
Ici
calculate
est picklable, donc il semble que cela puisse être résolu en 1) créant un objet fonction avec un constructeur qui copie sur un objetcalculate
et ensuite 2) en passant une instance de cet objet fonction àPool
'smap
méthode. Non ?1 votes
Je ne pense pas que les "changements récents" de Python soient d'une quelconque utilité. Certaines limitations du
multiprocessing
sont dues à son objectif d'être une implémentation multiplateforme, et à l'absence d'un module de gestion de l'environnement.fork(2)
-comme l'appel système de Windows. Si vous ne vous souciez pas du support Win32, il peut y avoir une solution plus simple basée sur les processus. Ou si vous êtes prêt à utiliser des threads au lieu de processus, vous pouvez substituer l'optionfrom multiprocessing import Pool
avecfrom multiprocessing.pool import ThreadPool as Pool
.