150 votes

Afficher la progression d'un appel imap_unordered du pool de multitraitement Python ?

J'ai un script qui exécute avec succès un ensemble de tâches de pool multiprocessing avec un imap_unordered() appel :

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion

Cependant, mon num_tasks est d'environ 250 000, et donc le join() bloque le thread principal pendant environ 10 secondes, et j'aimerais pouvoir envoyer un écho à la ligne de commande de manière incrémentielle pour montrer que le processus principal n'est pas bloqué. Quelque chose comme :

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print("Waiting for", remaining, "tasks to complete...")
  time.sleep(2)

Existe-t-il une méthode pour l'objet résultat ou le pool lui-même qui indique le nombre de tâches restantes ? J'ai essayé d'utiliser un multiprocessing.Value en tant que compteur ( do_work appelle un counter.value += 1 après avoir effectué sa tâche), mais le compteur n'atteint que ~85% de la valeur totale avant d'arrêter de s'incrémenter.

13voto

Julien Tourille Points 123

Je sais que c'est une question assez ancienne, mais voici ce que je fais quand je veux suivre la progression d'un ensemble de tâches en python.

from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep

def my_function(letter):
    sleep(2)
    return letter+letter

dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)

results = []

pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()

r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]

while len(results) != len(dummy_args):
    pbar.update(len(results))
    sleep(0.5)
pbar.finish()

print results

En fait, vous utilisez apply_async avec un callbak (dans ce cas, il s'agit d'ajouter la valeur retournée à une liste), de sorte que vous n'ayez pas à attendre pour faire quelque chose d'autre. Ensuite, à l'intérieur d'une boucle while, vous vérifiez la progression du travail. Dans ce cas, j'ai ajouté un widget pour que ce soit plus joli.

Le résultat :

4 of 4                                                                         
['AA', 'BB', 'CC', 'DD']

J'espère que cela vous aidera.

9voto

zeawoas Points 87

Une solution simple avec Pool.apply_async() :

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep

def work(x):
    sleep(0.2)
    return x**2

n = 10

with Pool(4) as p, tqdm(total=n) as pbar:
    res = [p.apply_async(
        work, args=(i,), callback=lambda _: pbar.update(1)) for i in range(n)]
    results = [r.get() for r in res]

6voto

XerCis Points 161

Démarrage rapide

Utilisation tqdm y multiprocessing.Pool

Installer

pip install tqdm

Exemple

import time
import threading
from multiprocessing import Pool

from tqdm import tqdm

def do_work(x):
    time.sleep(x)
    return x

def progress():
    time.sleep(3)  # Check progress after 3 seconds
    print(f'total: {pbar.total} finish:{pbar.n}')

tasks = range(10)
pbar = tqdm(total=len(tasks))

if __name__ == '__main__':
    thread = threading.Thread(target=progress)
    thread.start()
    results = []
    with Pool(processes=5) as pool:
        for result in pool.imap_unordered(do_work, tasks):
            results.append(result)
            pbar.update(1)
    print(results)

Résultat

Flacon

Installer

pip install flask

main.py

import time
from multiprocessing import Pool

from tqdm import tqdm
from flask import Flask, make_response, jsonify

app = Flask(__name__)

def do_work(x):
    time.sleep(x)
    return x

total = 5  # num of tasks
tasks = range(total)
pbar = tqdm(total=len(tasks))

@app.route('/run/')
def run():
    results = []
    with Pool(processes=2) as pool:
        for _result in pool.imap_unordered(do_work, tasks):
            results.append(_result)
            if pbar.n >= total:
                pbar.n = 0  # reset
            pbar.update(1)
    response = make_response(jsonify(dict(results=results)))
    response.headers.add('Access-Control-Allow-Origin', '*')
    response.headers.add('Access-Control-Allow-Headers', '*')
    response.headers.add('Access-Control-Allow-Methods', '*')
    return response

@app.route('/progress/')
def progress():
    response = make_response(jsonify(dict(n=pbar.n, total=pbar.total)))
    response.headers.add('Access-Control-Allow-Origin', '*')
    response.headers.add('Access-Control-Allow-Headers', '*')
    response.headers.add('Access-Control-Allow-Methods', '*')
    return response

Exécuter (sous Windows, par exemple)

set FLASK_APP=main
flask run

Liste des API

test.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Progress Bar</title>
    <script src="https://cdn.bootcss.com/jquery/3.0.0/jquery.min.js"></script>
    <script src="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/js/bootstrap.min.js"></script>
    <link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel="stylesheet">
</head>
<body>
<button id="run">Run the task</button>
<br><br>
<div class="progress">
    <div class="progress-bar" role="progressbar" aria-valuenow="1" aria-valuemin="0" aria-valuemax="100"
         style="width: 10%">0.00%
    </div>
</div>
</body>
<script>
    function set_progress_rate(n, total) {
        //Set the rate of progress bar
        var rate = (n / total * 100).toFixed(2);
        if (n > 0) {
            $(".progress-bar").attr("aria-valuenow", n);
            $(".progress-bar").attr("aria-valuemax", total);
            $(".progress-bar").text(rate + "%");
            $(".progress-bar").css("width", rate + "%");
        }
    }

    $("#run").click(function () {
        //Run the task
        $.ajax({
            url: "http://127.0.0.1:5000/run/",
            type: "GET",
            success: function (response) {
                set_progress_rate(100, 100);
                console.log('Results:' + response['results']);
            }
        });
    });
    setInterval(function () {
        //Show progress every 1 second
        $.ajax({
            url: "http://127.0.0.1:5000/progress/",
            type: "GET",
            success: function (response) {
                console.log(response);
                var n = response["n"];
                var total = response["total"];
                set_progress_rate(n, total);
            }
        });
    }, 1000);
</script>
</html>

Résultat

4voto

Aronstef Points 109

J'ai créé une classe personnalisée pour créer une impression de l'état d'avancement. Cela peut peut-être aider :

from multiprocessing import Pool, cpu_count

class ParallelSim(object):
    def __init__(self, processes=cpu_count()):
        self.pool = Pool(processes=processes)
        self.total_processes = 0
        self.completed_processes = 0
        self.results = []

    def add(self, func, args):
        self.pool.apply_async(func=func, args=args, callback=self.complete)
        self.total_processes += 1

    def complete(self, result):
        self.results.extend(result)
        self.completed_processes += 1
        print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))

    def run(self):
        self.pool.close()
        self.pool.join()

    def get_results(self):
        return self.results

3voto

padu Points 49

Après quelques recherches, j'ai écrit un petit module appelé barre parallèle . Il vous permet d'afficher à la fois la progression globale du pool et celle de chaque noyau séparément. Il est facile à utiliser et possède une bonne description.

Par exemple :

from parallelbar import progress_map
from parallelbar.tools import cpu_bench

if __name__=='__main__':
    # create list of task
    tasks = [1_000_000 + i for i in range(100)]
    progress_map(cpu_bench, tasks)

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X