282 votes

Comment puis-je récupérer la valeur de retour d'une fonction passée à multiprocessing.Process ?

Dans l'exemple de code ci-dessous, j'aimerais récupérer la valeur de retour de la fonction worker . Comment puis-je m'y prendre ? Où cette valeur est-elle stockée ?

Exemple de code :

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

Sortie :

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

Je ne parviens pas à trouver l'attribut correspondant dans les objets stockés dans la base de données de l'UE. jobs .

290voto

vartec Points 53382

Utilisez variable partagée pour communiquer. Par exemple, comme ceci :

import multiprocessing

def worker(procnum, return_dict):
    """worker function"""
    print(str(procnum) + " represent!")
    return_dict[procnum] = procnum

if __name__ == "__main__":
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i, return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print(return_dict.values())

82voto

Mark Points 1278

Je pense que l'approche suggérée par @sega_sai est la meilleure. Mais il faut vraiment un exemple de code, alors voilà :

import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

Ce qui imprimera les valeurs de retour :

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

Si vous êtes familier avec map (le Python 2 intégré), cela ne devrait pas être trop difficile. Sinon, jetez un coup d'œil à Lien de sega_Sai .

Notez la faible quantité de code nécessaire. (Notez également comment les processus sont réutilisés).

54voto

Matthew Moisen Points 1057

Pour tous ceux qui cherchent à savoir comment obtenir une valeur d'un Process en utilisant Queue :

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    p.join()
    print(queue.get())  # Prints {"foo": True}

Notez que dans Windows ou Jupyter Notebook, avec multithreading vous devez l'enregistrer comme un fichier et exécuter le fichier. Si vous le faites dans une invite de commande, vous verrez une erreur comme celle-ci :

 AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>

42voto

9000 Points 497

Pour une raison quelconque, je n'ai pas pu trouver d'exemple général sur la façon de faire cela avec Queue n'importe où (même les exemples de la doc de Python ne génèrent pas de processus multiples), alors voici ce que j'ai réussi à faire fonctionner après une dizaine d'essais :

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queue est une file d'attente bloquante et thread-safe que vous pouvez utiliser pour stocker les valeurs de retour des processus enfants. Vous devez donc passer la file d'attente à chaque processus. Une chose moins évidente ici est que vous devez get() de la file d'attente avant de join el Process es, sinon la file d'attente se remplit et bloque tout.

Mise à jour pour ceux qui sont orientés objet (testé dans Python 3.4) :

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)

37voto

David Cullen Points 5279

Cet exemple montre comment utiliser une liste de multiprocessing.Pipe pour renvoyer des chaînes de caractères à partir d'un nombre arbitraire de processus :

import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()

Sortie :

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

Cette solution utilise moins de ressources qu'une file d'attente multiprocesseur qui utilise

  • un tuyau
  • au moins une serrure
  • un tampon
  • un fil

ou un multiprocessing.SimpleQueue qui utilise

  • un tuyau
  • au moins une serrure

Il est très instructif d'examiner la source de chacun de ces types.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X