150 votes

Afficher la progression d'un appel imap_unordered du pool de multitraitement Python ?

J'ai un script qui exécute avec succès un ensemble de tâches de pool multiprocessing avec un imap_unordered() appel :

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion

Cependant, mon num_tasks est d'environ 250 000, et donc le join() bloque le thread principal pendant environ 10 secondes, et j'aimerais pouvoir envoyer un écho à la ligne de commande de manière incrémentielle pour montrer que le processus principal n'est pas bloqué. Quelque chose comme :

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print("Waiting for", remaining, "tasks to complete...")
  time.sleep(2)

Existe-t-il une méthode pour l'objet résultat ou le pool lui-même qui indique le nombre de tâches restantes ? J'ai essayé d'utiliser un multiprocessing.Value en tant que compteur ( do_work appelle un counter.value += 1 après avoir effectué sa tâche), mais le compteur n'atteint que ~85% de la valeur totale avant d'arrêter de s'incrémenter.

151voto

Tim Points 1227

Mon favori personnel - vous donne une jolie petite barre de progression et l'heure d'achèvement pendant que les choses s'exécutent et s'engagent en parallèle.

from multiprocessing import Pool
import tqdm

pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
    pass

96voto

J.F. Sebastian Points 102961

Il n'est pas nécessaire d'accéder aux attributs privés de l'ensemble des résultats :

from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
    sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))

38voto

reubano Points 369

J'ai constaté que le travail était déjà terminé au moment où j'ai essayé de vérifier son état d'avancement. Voici ce qui a fonctionné pour moi en utilisant tqdm .

pip install tqdm

from multiprocessing import Pool
from tqdm import tqdm

tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))

def do_work(x):
    # do something with x
    pbar.update(1)

pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()

Cela devrait fonctionner avec tous les types de multiprocessus, qu'ils soient bloqués ou non.

35voto

mrapacz Points 499

Comme l'a suggéré Tim, vous pouvez utiliser tqdm y imap pour résoudre ce problème. Je viens de tomber sur ce problème et j'ai modifié les paramètres de la imap_unordered afin que je puisse accéder aux résultats du mappage. Voici comment cela fonctionne :

from multiprocessing import Pool
import tqdm

pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))

Si vous ne vous souciez pas des valeurs renvoyées par vos travaux, vous n'avez pas besoin d'assigner la liste à une variable.

23voto

MidnightLightning Points 2261

J'ai trouvé une réponse moi-même en creusant un peu plus : En jetant un coup d'œil sur le __dict__ de la imap_unordered j'ai constaté qu'il possédait un objet de résultat _index qui s'incrémente à chaque fois qu'une tâche est accomplie. Cela fonctionne donc pour la journalisation, enveloppée dans l'attribut while boucle :

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  completed = rs._index
  if (completed == num_tasks): break
  print "Waiting for", num_tasks-completed, "tasks to complete..."
  time.sleep(2)

Cependant, j'ai constaté que l'échange des imap_unordered pour un map_async a permis une exécution beaucoup plus rapide, bien que l'objet résultat soit un peu différent. Au lieu de cela, l'objet résultat de map_async a une _number_left et un attribut ready() méthode :

p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  if (rs.ready()): break
  remaining = rs._number_left
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(0.5)

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X