150 votes

Afficher la progression d'un appel imap_unordered du pool de multitraitement Python ?

J'ai un script qui exécute avec succès un ensemble de tâches de pool multiprocessing avec un imap_unordered() appel :

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion

Cependant, mon num_tasks est d'environ 250 000, et donc le join() bloque le thread principal pendant environ 10 secondes, et j'aimerais pouvoir envoyer un écho à la ligne de commande de manière incrémentielle pour montrer que le processus principal n'est pas bloqué. Quelque chose comme :

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print("Waiting for", remaining, "tasks to complete...")
  time.sleep(2)

Existe-t-il une méthode pour l'objet résultat ou le pool lui-même qui indique le nombre de tâches restantes ? J'ai essayé d'utiliser un multiprocessing.Value en tant que compteur ( do_work appelle un counter.value += 1 après avoir effectué sa tâche), mais le compteur n'atteint que ~85% de la valeur totale avant d'arrêter de s'incrémenter.

1voto

Essayez cette approche simple basée sur les files d'attente, qui peut également être utilisée avec le pooling. N'oubliez pas que l'impression de quoi que ce soit après le lancement de la barre de progression entraînera son déplacement, du moins pour cette barre de progression particulière (PyPI's progress 1.5).

import time
from progress.bar import Bar

def status_bar( queue_stat, n_groups, n ):

    bar = Bar('progress', max = n)  

    finished = 0
    while finished < n_groups:

        while queue_stat.empty():
            time.sleep(0.01)

        gotten = queue_stat.get()
        if gotten == 'finished':
            finished += 1
        else:
            bar.next()
    bar.finish()

def process_data( queue_data, queue_stat, group):

    for i in group:

        ... do stuff resulting in new_data

        queue_stat.put(1)

    queue_stat.put('finished')  
    queue_data.put(new_data)

def multiprocess():

    new_data = []

    groups = [[1,2,3],[4,5,6],[7,8,9]]
    combined = sum(groups,[])

    queue_data = multiprocessing.Queue()
    queue_stat = multiprocessing.Queue()

    for i, group in enumerate(groups): 

        if i == 0:

            p = multiprocessing.Process(target = status_bar,
                args=(queue_stat,len(groups),len(combined)))
                processes.append(p)
                p.start()

        p = multiprocessing.Process(target = process_data,
        args=(queue_data, queue_stat, group))
        processes.append(p)
        p.start()

    for i in range(len(groups)):
        data = queue_data.get() 
        new_data += data

    for p in processes:
        p.join()

0voto

mohammad H Points 11

Certaines réponses fonctionnent avec la barre de progression, mais je n'ai pas pu obtenir de résultats dans la piscine.

J'ai utilisé tqdm pour créer une barre de progression vous pouvez l'installer en pip install tqdm

Le code simple ci-dessous fonctionne assez bien avec la barre de progression et vous pouvez également obtenir le résultat :

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep

tasks = range(5)
result = []

def do_work(x):
    # do something with x and return the result
    sleep(2)
    return x + 2

if __name__ == '__main__':
    pbar = tqdm(total=len(tasks))

    with Pool(2) as p:
        for i in p.imap_unordered(do_work, tasks):

            result.append(i)
            pbar.update(i)

    pbar.close()
    print(result)

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X