17 votes

Le multiprocessus dans un pipeline bien fait

J'aimerais savoir comment le multiprocessing se fait correctement. En supposant que je dispose d'une liste [1,2,3,4,5] généré par la fonction f1 qui est écrit dans un Queue (cercle vert à gauche). Je lance maintenant deux processus qui tirent sur cette file d'attente (en exécutant f2 dans les processus). Ils traitent les données, par exemple en doublant la valeur, et les écrivent dans la deuxième file d'attente. Maintenant, la fonction f3 lit ces données et les imprime.

layout of the data flow

À l'intérieur des fonctions, il y a une sorte de boucle, qui essaie de lire la file d'attente indéfiniment. Comment arrêter ce processus ?

Idée 1

f1 n'envoie pas seulement la liste, mais aussi un None ou un objet client, class PipelineTerminator: pass ou quelque chose de ce genre, qui se propage jusqu'en bas de l'échelle. f3 attend maintenant None à venir, quand il est là, il sort de la boucle. Problème : il est possible que l'un des deux f2 lit et propage le None tandis que l'autre traite encore un numéro. La dernière valeur est alors perdue.

Idée 2

f3 es f1 . La fonction f1 génère les données et les tuyaux, lance les processus avec f2 et alimente toutes les données. Après l'apparition et l'alimentation, il écoute sur le second tuyau, comptant et traitant simplement les objets reçus. Comme il sait combien de données ont été transmises, il peut mettre fin aux processus qui exécutent f2 . Mais si l'objectif est de mettre en place une filière de traitement, les différentes étapes doivent être séparables. C'est ainsi que f1 , f2 y f3 sont des éléments différents d'un pipeline, et les étapes coûteuses sont effectuées en parallèle.

Idée 3

pipeline idea 3

Chaque élément du pipeline est une fonction, cette fonction crée des processus comme elle le souhaite et est responsable de leur gestion. Elle sait combien de données sont entrées et combien de données ont été renvoyées (avec la fonction yield peut-être). Il n'y a donc aucun risque à propager un None objet.

setup child processes 

execute thread one and two and wait until both finished

thread 1:
    while True:
        pull from input queue
        if None: break and set finished_flag
        else: push to queue1 and increment counter1

thread 2:
    while True:
        pull from queue2
        increment counter2
        yield result
        if counter1 == counter2 and finished_flag: break

when both threads finished: kill process pool and return.

(Au lieu d'utiliser des fils, on peut peut-être imaginer une solution plus intelligente).

Alors ...

J'ai mis en œuvre une solution suivant l'idée 2, en alimentant et en attendant que les résultats arrivent, mais il ne s'agissait pas vraiment d'un pipeline avec des fonctions indépendantes branchées ensemble. Cela a fonctionné pour la tâche que je devais gérer, mais c'était difficile à maintenir.

J'aimerais que vous me disiez maintenant comment vous mettez en œuvre les pipelines (facilement dans un processus avec des fonctions de génération et ainsi de suite, mais avec plusieurs processus ?

12voto

Velimir Mlaker Points 1419

Avec MPipe il suffit de faire ceci :

from mpipe import OrderedStage, Pipeline

def f1(value):
    return value * 2

def f2(value):
    print(value)

s1 = OrderedStage(f1, size=2)
s2 = OrderedStage(f2)
p = Pipeline(s1.link(s2))

for task in 1, 2, 3, 4, 5, None:
    p.put(task)

Ce qui précède s'exécute 4 processus :

  • deux pour la première étape (fonction f1 )
  • un pour la deuxième étape (fonction f2 )
  • y un plus pour le programme principal qui alimente le pipeline.

Les Livre de cuisine MPipe explique comment les processus sont arrêtés en interne à l'aide de l'outil None comme dernière tâche.

Pour exécuter le code, installez MPipe :

virtualenv venv
venv/bin/pip install mpipe
venv/bin/python prog.py

Sortie :

2
4
6
8
10

2voto

unutbu Points 222216

Pour l'idée 1, que diriez-vous de.. :

import multiprocessing as mp

sentinel=None

def f2(inq,outq):
    while True:
        val=inq.get()
        if val is sentinel:
            break
        outq.put(val*2)

def f3(outq):
    while True:
        val=outq.get()
        if val is sentinel:
            break
        print(val)

def f1():
    num_workers=2
    inq=mp.Queue()
    outq=mp.Queue()
    for i in range(5):
        inq.put(i)
    for i in range(num_workers):        
        inq.put(sentinel)
    workers=[mp.Process(target=f2,args=(inq,outq)) for i in range(2)]
    printer=mp.Process(target=f3,args=(outq,))
    for w in workers:
        w.start()
    printer.start()
    for w in workers:
        w.join()
    outq.put(sentinel)
    printer.join()

if __name__=='__main__':
    f1()

La seule différence par rapport à la description de l'idée 1 est que f2 s'échappe de la while-loop lorsqu'il reçoit la sentinelle (se terminant ainsi lui-même). f1 jusqu'à ce que les travailleurs aient terminé (en utilisant w.join() ) et envoie ensuite f3 la sentinelle (en lui faisant signe de sortir de son while-loop ).

1voto

jsbueno Points 22212

Qu'y aurait-il de mal à utiliser l'idée 1, mais avec chaque processus travailleur (f2) qui mettrait un objet personnalisé avec son identifiant lorsqu'il est terminé ? Ensuite, f3 mettrait fin à ce processus, jusqu'à ce qu'il n'y en ait plus.

De plus, Python 3.2 a introduit le paquet concurrent.futures dans la bibliothèque standard, qui devrait permettre de faire ce que vous essayez de faire de la "bonne manière" (tm). http://docs.python.org/dev/library/concurrent.futures.html

Il est peut-être possible de trouver un backport de concurrent.futures pour la série Python 2.x.

1voto

Winand Points 1037

J'utilise concurent.futures et trois piscines, qui sont reliées entre elles par future.add_done_callback . J'attends ensuite la fin du processus en appelant shutdown sur chaque piscine.

from concurrent.futures import ProcessPoolExecutor
import time
import random

def worker1(arg):
    time.sleep(random.random())
    return arg

def pipe12(future):
    pool2.submit(worker2, future.result()).add_done_callback(pipe23)

def worker2(arg):
    time.sleep(random.random())
    return arg

def pipe23(future):
    pool3.submit(worker3, future.result()).add_done_callback(spout)

def worker3(arg):
    time.sleep(random.random())
    return arg

def spout(future):
    print(future.result())

if __name__ == "__main__":
    __spec__ = None  # Fix multiprocessing in Spyder's IPython
    pool1 = ProcessPoolExecutor(2)
    pool2 = ProcessPoolExecutor(2)
    pool3 = ProcessPoolExecutor(2)
    for i in range(10):
        pool1.submit(worker1, i).add_done_callback(pipe12)
    pool1.shutdown()
    pool2.shutdown()
    pool3.shutdown()

0voto

RaJa Points 708

Le moyen le plus simple d'y parvenir est d'utiliser des sémaphores.

F1

F1 consiste à remplir votre "file d'attente" avec les données que vous souhaitez traiter. À la fin de cette poussée, vous placez n mots-clés "Stop" dans votre file d'attente. n = 2 pour votre exemple, mais généralement le nombre de travailleurs impliqués. Le code ressemblerait à ceci :

for n in no_of_processes:
    tasks.put('Stop')

F2

F2 est tirée de la file d'attente fournie par un get -commande. L'élément est pris dans la file d'attente et supprimé dans la file d'attente. Maintenant, vous pouvez mettre le pop dans une boucle tout en faisant attention au signal d'arrêt :

for elem in iter(tasks.get, 'STOP'):
   do something

F3

Celui-ci est un peu délicat. Vous pourriez générer un sémaphore dans F2 qui agirait comme un signal vers F3. Mais vous ne savez pas quand ce signal arrive et vous risquez de perdre des données. Cependant, F3 tire les données de la même manière que F2 et vous pouvez les mettre dans un fichier try... except -déclaration. queue.get soulève un queue.Empty lorsqu'il n'y a pas d'éléments dans la file d'attente. Ainsi, votre tirage dans F3 ressemblerait à ce qui suit :

while control:
    try:
        results.get()
    except queue.Empty:
        control = False

Avec tasks y results étant des files d'attente. Vous n'avez donc besoin de rien qui ne soit pas déjà inclus dans Python.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X