527 votes

comment paralléliser une simple boucle en python ?

C'est probablement une question triviale, mais comment puis-je paralléliser la boucle suivante en python ?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

Je sais comment lancer des threads uniques en python mais je ne sais pas comment "collecter" les résultats. Par conséquent, ma question est la suivante : quel est le moyen le plus simple de paralléliser ce code ?

2 votes

Une solution très simple pour paralléliser une for n'est pas encore mentionnée comme une réponse - il s'agirait de décorer simplement deux fonctions en utilisant la fonction deco paquet

307voto

Sven Marnach Points 133943

L'utilisation de plusieurs threads sur CPython ne vous donnera pas de meilleures performances pour du code purement Python à cause du verrou global de l'interpréteur (GIL). Je suggère d'utiliser le multiprocessing à la place :

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

Notez que cela ne fonctionnera pas dans l'interpréteur interactif.

Pour éviter le FUD habituel autour du GIL : il n'y aurait de toute façon aucun avantage à utiliser les threads pour cet exemple. Vous veulent d'utiliser des processus ici, pas des threads, car ils évitent tout un tas de problèmes.

153 votes

Puisque c'est la réponse choisie, est-il possible d'avoir un exemple plus complet ? Quels sont les arguments de calc_stuff ?

8 votes

@EduardoPignatelli S'il vous plaît, lisez simplement la documentation de l'outil de gestion de l'environnement. multiprocessing pour des exemples plus complets. Pool.map() fonctionne essentiellement comme suit map() mais en parallèle.

7 votes

Existe-t-il un moyen d'ajouter simplement une barre de chargement tqdm à cette structure de code ? J'ai utilisé tqdm(pool.imap(calc_stuff, range(0, 10 * offset, offset))) mais je n'obtiens pas un graphique de barre de chargement complet.

200voto

FMan Points 666
from joblib import Parallel, delayed
def process(i):
    return i * i

results = Parallel(n_jobs=2)(delayed(process)(i) for i in range(10))
print(results)

Ce qui précède fonctionne à merveille sur ma machine (Ubuntu, le paquet joblib était pré-installé, mais peut être installé via pip install joblib ).

Tiré de https://blog.dominodatalab.com/simple-parallelization/


Modification le 31 mars 2021 : Sur joblib , multiprocessing , threading y asyncio

  • joblib dans le code ci-dessus utilise import multiprocessing sous le capot (et donc plusieurs processus, ce qui est généralement la meilleure façon d'exécuter le travail du CPU sur plusieurs cœurs - à cause de la GIL)
  • Vous pouvez laisser joblib utiliser des threads multiples au lieu de processus multiples, mais cela (ou l'utilisation de import threading directement) n'est bénéfique que si les threads passent un temps considérable en E/S (par exemple, lecture/écriture sur disque, envoi d'une requête HTTP). Pour les travaux d'E/S, la GIL ne bloque pas l'exécution d'un autre thread.
  • Depuis Python 3.7, comme alternative à threading vous pouvez paralléliser le travail avec asyncio mais les mêmes conseils s'appliquent pour import threading (mais contrairement à ce dernier, un seul fil sera utilisé).
  • L'utilisation de plusieurs processus entraîne des frais généraux. Vous devez vérifier vous-même si l'extrait de code ci-dessus améliore votre temps d'exécution. En voici un autre, pour lequel j'ai confirmé que joblib produit de meilleurs résultats :

    import time from joblib import Parallel, delayed

    def countdown(n): while n>0: n -= 1 return n

    t = time.time() for _ in range(20): print(countdown(10**7), end=" ") print(time.time() - t)

    takes ~10.5 seconds on medium sized Macbook Pro

    t = time.time() results = Parallel(njobs=2)(delayed(countdown)(10**7) for in range(20)) print(results) print(time.time() - t)

    takes ~6.3 seconds on medium sized Macbook Pro

10 votes

J'ai essayé votre code mais sur mon système, la version séquentielle de ce code prend environ une demi-minute et la version parallèle ci-dessus prend 4 minutes. Pourquoi ?

4 votes

Merci pour votre réponse ! Je pense que c'est la façon la plus élégante de le faire en 2019.

2 votes

@tyrex merci pour le partage ! ce package joblib est génial et l'exemple fonctionne pour moi. Cependant, dans un contexte plus complexe, j'ai eu un bug malheureusement. github.com/joblib/joblib/issues/949

142voto

Hamza Points 3028

C'est la façon la plus simple de le faire !

Vous pouvez utiliser asyncio . (La documentation peut être trouvée aquí ). Il est utilisé comme base pour de nombreux frameworks asynchrones Python qui fournissent des serveurs réseau et web performants, des bibliothèques de connexion aux bases de données, des files d'attente de tâches distribuées, etc. De plus, il dispose d'API de haut niveau et de bas niveau pour répondre à tout type de problème.

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument):
    #code

Maintenant, cette fonction sera exécutée en parallèle chaque fois qu'elle sera appelée sans mettre le programme principal en état d'attente. Vous pouvez également l'utiliser pour paralléliser une boucle for. Lorsqu'elle est appelée pour une boucle for, bien que la boucle soit séquentielle, chaque itération s'exécute en parallèle au programme principal dès que l'interpréteur y arrive. Par exemple :

@background
def your_function(argument):
    time.sleep(5)
    print('function finished for '+str(argument))

for i in range(10):
    your_function(i)

print('loop finished')

Cela produit le résultat suivant :

loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1

3 votes

Merci ! Je suis d'accord pour dire que c'est la manière la plus simple de le faire.

1 votes

Imaginez que vous ayez différentes impressions dans votre_fonction(), y a-t-il un moyen de la forcer à exécuter toutes les impressions puis à passer au i suivant dans la boucle for ?

6 votes

Bel exemple, mais y a-t-il un moyen d'attendre avant l'impression finale ? print('loop finished')

106voto

Gael Varoquaux Points 862

Pour paralléliser une simple boucle for, joblib apporte beaucoup de valeur à l'utilisation brute du multitraitement. Pas seulement la syntaxe courte, mais aussi des choses comme le regroupement transparent des itérations lorsqu'elles sont très rapides (pour supprimer l'overhead) ou la capture du traceback du processus enfant, pour avoir un meilleur rapport d'erreur.

Avertissement : Je suis l'auteur original de joblib.

3 votes

J'ai essayé joblib avec jupyter, cela ne fonctionne pas. Après l'appel Parallel-delayed, la page a cessé de fonctionner.

2 votes

Bonjour, j'ai un problème avec l'utilisation de joblib ( stackoverflow.com/questions/52166572/ ), avez-vous une idée de ce qui pourrait en être la cause ? Merci beaucoup.

1 votes

C'est quelque chose que je veux essayer ! Est-il possible de l'utiliser avec une double boucle, par exemple for i in range(10) : for j in range(20)

81voto

Aaron Hall Points 7381

Quel est le moyen le plus simple de paralléliser ce code ?

Utiliser un PoolExecutor de concurrent.futures . Comparez le code original avec celui-ci, côte à côte. Tout d'abord, la manière la plus concise d'aborder ce problème est de le faire avec executor.map :

...
with ProcessPoolExecutor() as executor:
    for out1, out2, out3 in executor.map(calc_stuff, parameters):
        ...

ou ventilé en soumettant chaque appel individuellement :

...
with ThreadPoolExecutor() as executor:
    futures = []
    for parameter in parameters:
        futures.append(executor.submit(calc_stuff, parameter))

    for future in futures:
        out1, out2, out3 = future.result() # this will block
        ...

La sortie du contexte signale à l'exécuteur qu'il doit libérer des ressources.

Vous pouvez utiliser des threads ou des processus et utiliser exactement la même interface.

Un exemple concret

Voici un exemple de code fonctionnel qui démontre la valeur de :

Mettez cela dans un fichier - futuretest.py :

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from time import time
from http.client import HTTPSConnection

def processor_intensive(arg):
    def fib(n): # recursive, processor intensive calculation (avoid n > 36)
        return fib(n-1) + fib(n-2) if n > 1 else n
    start = time()
    result = fib(arg)
    return time() - start, result

def io_bound(arg):
    start = time()
    con = HTTPSConnection(arg)
    con.request('GET', '/')
    result = con.getresponse().getcode()
    return time() - start, result

def manager(PoolExecutor, calc_stuff):
    if calc_stuff is io_bound:
        inputs = ('python.org', 'stackoverflow.com', 'stackexchange.com',
                  'noaa.gov', 'parler.com', 'aaronhall.dev')
    else:
        inputs = range(25, 32)
    timings, results = list(), list()
    start = time()
    with PoolExecutor() as executor:
        for timing, result in executor.map(calc_stuff, inputs):
            # put results into correct output list:
            timings.append(timing), results.append(result)
    finish = time()
    print(f'{calc_stuff.__name__}, {PoolExecutor.__name__}')
    print(f'wall time to execute: {finish-start}')
    print(f'total of timings for each call: {sum(timings)}')
    print(f'time saved by parallelizing: {sum(timings) - (finish-start)}')
    print(dict(zip(inputs, results)), end = '\n\n')

def main():
    for computation in (processor_intensive, io_bound):
        for pool_executor in (ProcessPoolExecutor, ThreadPoolExecutor):
            manager(pool_executor, calc_stuff=computation)

if __name__ == '__main__':
    main()

Et voici la sortie d'une exécution de python -m futuretest :

processor_intensive, ProcessPoolExecutor
wall time to execute: 0.7326343059539795
total of timings for each call: 1.8033506870269775
time saved by parallelizing: 1.070716381072998
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}

processor_intensive, ThreadPoolExecutor
wall time to execute: 1.190223217010498
total of timings for each call: 3.3561410903930664
time saved by parallelizing: 2.1659178733825684
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}

io_bound, ProcessPoolExecutor
wall time to execute: 0.533886194229126
total of timings for each call: 1.2977914810180664
time saved by parallelizing: 0.7639052867889404
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}

io_bound, ThreadPoolExecutor
wall time to execute: 0.38941240310668945
total of timings for each call: 1.6049387454986572
time saved by parallelizing: 1.2155263423919678
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}

Analyse à forte intensité de processeur

Lorsque vous effectuez des calculs intensifs en Python, attendez-vous à ce que la fonction ProcessPoolExecutor pour être plus performant que le ThreadPoolExecutor .

En raison du verrouillage global de l'interpréteur (alias le GIL), les threads ne peuvent pas utiliser plusieurs processeurs. Il faut donc s'attendre à ce que le temps de chaque calcul et le wall time (temps réel écoulé) soient plus importants.

Analyse liée aux OI

D'autre part, lors de l'exécution d'opérations liées à l'IO, attendez-vous à ce que ThreadPoolExecutor pour être plus performant que ProcessPoolExecutor .

Les threads de Python sont de vrais threads de système d'exploitation. Ils peuvent être mis en sommeil par le système d'exploitation et réveillés lorsque leurs informations arrivent.

Dernières réflexions

Je soupçonne que le multitraitement sera plus lent sous Windows, puisque Windows ne supporte pas la bifurcation et que chaque nouveau processus doit prendre du temps pour être lancé.

Vous pouvez imbriquer plusieurs threads à l'intérieur de plusieurs processus, mais il est recommandé de ne pas utiliser plusieurs threads pour lancer plusieurs processus.

Si vous êtes confronté à un problème de traitement lourd en Python, vous pouvez trivialement faire évoluer le système avec des processus supplémentaires - mais pas tellement avec le threading.

0 votes

ThreadPoolExecutor contourne-t-il les limitations imposées par la GIL ? De plus, ne faudrait-il pas joindre() pour attendre que les exécuteurs se terminent ou cela est-il pris en charge implicitement dans le gestionnaire de contexte ?

1 votes

Non et non, oui à "traité implicitement".

0 votes

Pour une raison quelconque, lorsque l'on augmente l'échelle du problème, le multithreading est extrêmement rapide, mais le multiprocessing génère un grand nombre de processus bloqués (dans macOS). Une idée de la raison de cette situation ? Le processus ne contient que des boucles imbriquées et des mathématiques, rien d'exotique.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X