17 votes

Le multiprocessus dans un pipeline bien fait

J'aimerais savoir comment le multiprocessing se fait correctement. En supposant que je dispose d'une liste [1,2,3,4,5] généré par la fonction f1 qui est écrit dans un Queue (cercle vert à gauche). Je lance maintenant deux processus qui tirent sur cette file d'attente (en exécutant f2 dans les processus). Ils traitent les données, par exemple en doublant la valeur, et les écrivent dans la deuxième file d'attente. Maintenant, la fonction f3 lit ces données et les imprime.

layout of the data flow

À l'intérieur des fonctions, il y a une sorte de boucle, qui essaie de lire la file d'attente indéfiniment. Comment arrêter ce processus ?

Idée 1

f1 n'envoie pas seulement la liste, mais aussi un None ou un objet client, class PipelineTerminator: pass ou quelque chose de ce genre, qui se propage jusqu'en bas de l'échelle. f3 attend maintenant None à venir, quand il est là, il sort de la boucle. Problème : il est possible que l'un des deux f2 lit et propage le None tandis que l'autre traite encore un numéro. La dernière valeur est alors perdue.

Idée 2

f3 es f1 . La fonction f1 génère les données et les tuyaux, lance les processus avec f2 et alimente toutes les données. Après l'apparition et l'alimentation, il écoute sur le second tuyau, comptant et traitant simplement les objets reçus. Comme il sait combien de données ont été transmises, il peut mettre fin aux processus qui exécutent f2 . Mais si l'objectif est de mettre en place une filière de traitement, les différentes étapes doivent être séparables. C'est ainsi que f1 , f2 y f3 sont des éléments différents d'un pipeline, et les étapes coûteuses sont effectuées en parallèle.

Idée 3

pipeline idea 3

Chaque élément du pipeline est une fonction, cette fonction crée des processus comme elle le souhaite et est responsable de leur gestion. Elle sait combien de données sont entrées et combien de données ont été renvoyées (avec la fonction yield peut-être). Il n'y a donc aucun risque à propager un None objet.

setup child processes 

execute thread one and two and wait until both finished

thread 1:
    while True:
        pull from input queue
        if None: break and set finished_flag
        else: push to queue1 and increment counter1

thread 2:
    while True:
        pull from queue2
        increment counter2
        yield result
        if counter1 == counter2 and finished_flag: break

when both threads finished: kill process pool and return.

(Au lieu d'utiliser des fils, on peut peut-être imaginer une solution plus intelligente).

Alors ...

J'ai mis en œuvre une solution suivant l'idée 2, en alimentant et en attendant que les résultats arrivent, mais il ne s'agissait pas vraiment d'un pipeline avec des fonctions indépendantes branchées ensemble. Cela a fonctionné pour la tâche que je devais gérer, mais c'était difficile à maintenir.

J'aimerais que vous me disiez maintenant comment vous mettez en œuvre les pipelines (facilement dans un processus avec des fonctions de génération et ainsi de suite, mais avec plusieurs processus ?

0voto

Cristian Garcia Points 805

Pypeline le fait pour vous. Vous pouvez même choisir d'utiliser des processus, des threads ou des tâches asynchrones. Ce que vous voulez, c'est par exemple utiliser des processus :

import pypeln as pl

data = some_iterable()
data = pl.process.map(f2, data, workers = 3)
data = list(data)

Vous pouvez faire des choses plus complexes

import pypeln as pl

data = some_iterable()
data = pl.process.map(f2, data, workers = 3)
data = pl.process.filter(f3, data, workers = 1)
data = pl.process.flat_map(f4, data, workers = 5)
data = list(data)

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X