J'aimerais savoir comment le multiprocessing se fait correctement. En supposant que je dispose d'une liste [1,2,3,4,5]
généré par la fonction f1
qui est écrit dans un Queue
(cercle vert à gauche). Je lance maintenant deux processus qui tirent sur cette file d'attente (en exécutant f2
dans les processus). Ils traitent les données, par exemple en doublant la valeur, et les écrivent dans la deuxième file d'attente. Maintenant, la fonction f3
lit ces données et les imprime.
À l'intérieur des fonctions, il y a une sorte de boucle, qui essaie de lire la file d'attente indéfiniment. Comment arrêter ce processus ?
Idée 1
f1
n'envoie pas seulement la liste, mais aussi un None
ou un objet client, class PipelineTerminator: pass
ou quelque chose de ce genre, qui se propage jusqu'en bas de l'échelle. f3
attend maintenant None
à venir, quand il est là, il sort de la boucle. Problème : il est possible que l'un des deux f2
lit et propage le None
tandis que l'autre traite encore un numéro. La dernière valeur est alors perdue.
Idée 2
f3
es f1
. La fonction f1
génère les données et les tuyaux, lance les processus avec f2
et alimente toutes les données. Après l'apparition et l'alimentation, il écoute sur le second tuyau, comptant et traitant simplement les objets reçus. Comme il sait combien de données ont été transmises, il peut mettre fin aux processus qui exécutent f2
. Mais si l'objectif est de mettre en place une filière de traitement, les différentes étapes doivent être séparables. C'est ainsi que f1
, f2
y f3
sont des éléments différents d'un pipeline, et les étapes coûteuses sont effectuées en parallèle.
Idée 3
Chaque élément du pipeline est une fonction, cette fonction crée des processus comme elle le souhaite et est responsable de leur gestion. Elle sait combien de données sont entrées et combien de données ont été renvoyées (avec la fonction yield
peut-être). Il n'y a donc aucun risque à propager un None
objet.
setup child processes
execute thread one and two and wait until both finished
thread 1:
while True:
pull from input queue
if None: break and set finished_flag
else: push to queue1 and increment counter1
thread 2:
while True:
pull from queue2
increment counter2
yield result
if counter1 == counter2 and finished_flag: break
when both threads finished: kill process pool and return.
(Au lieu d'utiliser des fils, on peut peut-être imaginer une solution plus intelligente).
Alors ...
J'ai mis en œuvre une solution suivant l'idée 2, en alimentant et en attendant que les résultats arrivent, mais il ne s'agissait pas vraiment d'un pipeline avec des fonctions indépendantes branchées ensemble. Cela a fonctionné pour la tâche que je devais gérer, mais c'était difficile à maintenir.
J'aimerais que vous me disiez maintenant comment vous mettez en œuvre les pipelines (facilement dans un processus avec des fonctions de génération et ainsi de suite, mais avec plusieurs processus ?