229 votes

Impossible de récupérer <type 'instancemethod'> en utilisant le multitraitement de python Pool.map()

J'essaie d'utiliser la fonction Pool.map() du multitraitement pour répartir le travail simultanément. Lorsque j'utilise le code suivant, cela fonctionne bien :

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)             
    #result = pool.apply_async(self.f, [10])     
    #print result.get(timeout=1)           
    print pool.map(f, range(10))

if __name__== '__main__' :
    go()

Cependant, lorsque je l'utilise dans une approche plus orientée objet, cela ne fonctionne pas. Le message d'erreur qu'il donne est le suivant :

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed

Cela se produit lorsque mon programme principal est le suivant :

import someClass

if __name__== '__main__' :
    # lfq = lastFmQueries.lastFmQueries()
    # x = lfq.getUsersTopTracks("acet", "overall")
    sc = someClass.someClass()
    sc.go()

et ce qui suit est ma classe someClass :

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)             
        #result = pool.apply_async(self.f, [10])     
        #print result.get(timeout=1)           
        print pool.map(self.f, range(10))

Quelqu'un sait-il quel pourrait être le problème, ou un moyen facile de le contourner ?

128voto

Alex Martelli Points 330805

Le problème est que le multiprocessing doit décaper les choses pour les répartir entre les processus, et les méthodes liées ne sont pas décapables. La solution de contournement (que vous la considériez comme "facile" ou non;-) est d'ajouter l'infrastructure à votre programme pour permettre à de telles méthodes d'être décapées, en l'enregistrant avec la fonction copy_reg méthode de bibliothèque standard.

Par exemple, la contribution de Steven Bethard à ce fil (vers la fin du fil de discussion) montre une approche parfaitement réalisable pour permettre le décapage/décapage de la méthode par le biais de copy_reg .

78voto

Mike McKerns Points 965

Toutes ces solutions sont laides car le multiprocessing et le pickling sont cassés et limités à moins de sortir de la bibliothèque standard.

Si vous utilisez une fourchette de multiprocessing appelé pathos.multiprocesssing vous pouvez utiliser directement les classes et les méthodes de classe dans l'interface utilisateur de multiprocessing. map fonctions. Cela s'explique par le fait que dill est utilisé à la place de pickle ou cPickle et dill peut sérialiser presque n'importe quoi en python.

pathos.multiprocessing fournit également une fonction de carte asynchrone et il peut map les fonctions à arguments multiples (par exemple map(math.pow, [1,2,3], [4,5,6]) )

Voir : Que peuvent faire ensemble le multitraitement et l'aneth ?

et : http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> import pathos.multiprocessing as mp
>>> p = mp.ProcessingPool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

Et pour être explicite, vous pouvez faire exactement ce que vous vouliez faire au départ, et vous pouvez le faire depuis l'interpréteur, si vous le souhaitez.

>>> import pathos.multiprocessing as mp
>>> class someClass(object):
...   def __init__(self):
...     pass
...   def f(self, x):
...     return x*x
...   def go(self):
...     pool = mp.ProcessingPool(4)
...     print pool.map(self.f, range(10))
... 
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> 

Obtenez le code ici : https://github.com/uqfoundation/pathos

36voto

dorvak Points 1202

Vous pouvez également définir un __call__() à l'intérieur de votre someClass() qui appelle someClass.go() et passe ensuite une instance de someClass() à la piscine. Cet objet est pickleable et cela fonctionne bien (pour moi)...

23voto

Eric H. Points 160

Quelques limitations à la solution de Steven Bethard :

Lorsque vous enregistrez la méthode de votre classe en tant que fonction, le destructeur de votre classe est appelé de manière surprenante chaque fois que le traitement de votre méthode est terminé. Donc si vous avez 1 instance de votre classe qui appelle n fois sa méthode, des membres peuvent disparaître entre 2 exécutions et vous pouvez obtenir un message malloc: *** error for object 0x...: pointer being freed was not allocated (par exemple, ouvrir un fichier membre) ou pure virtual method called, terminate called without an active exception (ce qui signifie que la durée de vie d'un objet membre que j'ai utilisé était plus courte que ce que je pensais). J'ai obtenu ce résultat en traitant avec n plus grand que la taille du pool. Voici un petit exemple :

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multi-processing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __del__(self):
        print "... Destructor"

    def process_obj(self, index):
        print "object %d" % index
        return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)

Sortie :

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor

Le site __call__ n'est pas aussi équivalente, car [None,...] est lu à partir des résultats :

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multiprocessing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __call__(self, i):
        self.process_obj(i)

    def __del__(self):
        print "... Destructor"

    def process_obj(self, i):
        print "obj %d" % i
        return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty !

Donc aucune des deux méthodes n'est satisfaisante...

16voto

torek Points 25463

Il existe un autre raccourci que vous pouvez utiliser, bien qu'il puisse être inefficace selon le contenu de vos instances de classe.

Comme tout le monde l'a dit, le problème est que le multiprocessing Le code doit mettre en pickle les choses qu'il envoie aux sous-processus qu'il a lancés, et le pickle ne fait pas de méthodes d'instance.

Toutefois, au lieu d'envoyer la méthode d'instance, vous pouvez envoyer l'instance de classe réelle, ainsi que le nom de la fonction à appeler, à une fonction ordinaire qui utilise ensuite la méthode d'instance. getattr pour appeler la méthode d'instance, créant ainsi la méthode liée dans l'environnement de l'utilisateur. Pool sous-processus. Ceci est similaire à la définition d'un __call__ sauf que vous pouvez appeler plus d'une fonction membre.

Voler le code de @EricH. à partir de sa réponse et l'annoter un peu (je l'ai retapé d'où tous les changements de nom et autres, pour une raison quelconque, cela semblait plus facile que de copier-coller :-) ) pour illustrer toute la magie :

import multiprocessing
import os

def call_it(instance, name, args=(), kwargs=None):
    "indirect caller for instance methods and multiprocessing"
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self, nobj, workers=multiprocessing.cpu_count()):
        print "Constructor (in pid=%d)..." % os.getpid()
        self.count = 1
        pool = multiprocessing.Pool(processes = workers)
        async_results = [pool.apply_async(call_it,
            args = (self, 'process_obj', (i,))) for i in range(nobj)]
        pool.close()
        map(multiprocessing.pool.ApplyResult.wait, async_results)
        lst_results = [r.get() for r in async_results]
        print lst_results

    def __del__(self):
        self.count -= 1
        print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)

    def process_obj(self, index):
        print "object %d" % index
        return "results"

Klass(nobj=8, workers=3)

La sortie montre qu'en effet, le constructeur est appelé une fois (dans le pid d'origine) et le destructeur est appelé 9 fois (une fois pour chaque copie faite = 2 ou 3 fois par processus de pool-worker selon les besoins, plus une fois dans le processus d'origine). C'est souvent OK, comme dans ce cas, puisque le pickler par défaut fait une copie de l'instance entière et la repeuple (semi-) secrètement - dans ce cas, en faisant :

obj = object.__new__(Klass)
obj.__dict__.update({'count':1})

-C'est la raison pour laquelle, même si le destructeur est appelé huit fois dans les trois processus de travail, il compte de 1 à 0 à chaque fois. Si nécessaire, vous pouvez fournir votre propre __setstate__ :

    def __setstate__(self, adict):
        self.count = adict['count']

dans ce cas par exemple.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X