378 votes

Python multitraitement de décapage de l'erreur

Je suis désolé que je ne peux pas reproduire l'erreur avec un exemple simple, et mon code est trop compliqué pour le poste. Si je lance le programme dans IPython shell au lieu de l'ordinaire python, les choses fonctionnent bien.

J'ai regardé quelques notes précédentes sur ce problème. Ils ont tous été provoqués par l'utilisation de la piscine à l'appel de la fonction définie à l'intérieur d'une fonction de classe. Mais ce n'est pas le cas pour moi.

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Je vous serais reconnaissant de toute aide.

Mise à JOUR: La fonction que j'ai cornichon est définie au plus haut niveau du module. Bien qu'il appelle une fonction qui contient une fonction imbriquée. j'.e, f() appelle g() appelle h() qui a une fonction imbriquée (i), et je fais appel à la piscine.apply_async(f). f(), g(), h() sont définies au plus haut niveau. J'ai essayé exemple plus simple avec ce modèle et il fonctionne bien.

431voto

unutbu Points 222216

Voici une liste de ce qui peut être nettoyée. En particulier, les fonctions ne sont picklable si ils sont définis au premier niveau d'un module.

Ce morceau de code:

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

pool = mp.Pool()
foo = Foo()
pool.apply_async(foo.work)
pool.close()
pool.join()

génère une erreur presque identique à celui que vous avez posté:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Le problème est que l' pool méthodes utilisent tous un queue.Queue pour passer les tâches du processus de travail. Tout ce qui va par le biais de l' queue.Queue doit être cliquable, et foo.work n'est pas picklable car elle n'est pas définie au niveau supérieur du module.

Il peut être fixé par la définition d'une fonction de niveau supérieur, qui demande foo.work():

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

Notez que foo est cliquable, depuis Foo est définie au plus haut niveau et foo.__dict__ est picklable.

141voto

Mike McKerns Points 965

Je ne l'utiliserais pathos.multiprocesssing, au lieu de multiprocessing. pathos.multiprocessing est un fork de multiprocessing qui utilise dill. dill pouvez sérialiser presque rien en python, de sorte que vous êtes en mesure d'envoyer beaucoup plus autour en parallèle. L' pathos fourche a également la possibilité de travailler directement avec les multiples argument de fonctions, que vous avez besoin pour des méthodes de la classe.

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

Obtenez de l' pathos (et si vous le souhaitez, dill) ici: https://github.com/uqfoundation

42voto

rocksportrocker Points 3031

Comme d'autres l'ont dit - multiprocessing ne peut transférer des objets Python pour les processus de travail qui peuvent être traitées. Si vous ne pouvez pas réorganiser votre code comme décrit par unutbu, vous pouvez utiliser dills étendu décapage/unpickling des capacités pour le transfert de données (en particulier un code de données) comme je l'ai montré ci-dessous.

Cette solution ne nécessite que l'installation d' dill et pas les autres bibliothèques pathos:

import os
from multiprocessing import Pool

import dill

def run_dill_encoded(what):
    fun, args = dill.loads(what)
    return fun(*args)

def apply_async(pool, fun, args):
    return pool.apply_async(run_dill_encoded, (dill.dumps((fun, args)),))

if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()

31voto

Ezekiel Kruglick Points 398

J'ai trouvé que je peux aussi générer exactement ce que la sortie d'erreur sur un parfaitement de travail de morceau de code en essayant d'utiliser le profiler.

Notez que c'était sur Windows (où la fourche est un peu moins élégant).

J'ai été en cours d'exécution:

python -m profile -o output.pstats <script> 

Et a constaté que la suppression de la de profilage à l'écart de l'erreur et de placer le profilage restauré. A été me batty trop parce que je savais que le code utilisé pour le travail. J'ai été vérifier pour voir si quelque chose avait mis à jour pool.py... ensuite eu un sentiment de descente et éliminé le profilage et qu'il a été.

Poster ici pour les archives, au cas où quelqu'un d'autre s'exécute.

1voto

Tim Swast Points 3697

Êtes-vous en passant un tableau numpy des chaînes par hasard?

J'ai eu ce même message d'erreur quand je passe un tableau qui contient une chaîne vide. Je pense que c'est peut être à cause de ce bug: http://projects.scipy.org/numpy/ticket/1658

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X