Quel est le moyen le plus simple de paralléliser ce code ?
Utiliser un PoolExecutor de concurrent.futures
. Comparez le code original avec celui-ci, côte à côte. Tout d'abord, la manière la plus concise d'aborder ce problème est de le faire avec executor.map
:
...
with ProcessPoolExecutor() as executor:
for out1, out2, out3 in executor.map(calc_stuff, parameters):
...
ou ventilé en soumettant chaque appel individuellement :
...
with ThreadPoolExecutor() as executor:
futures = []
for parameter in parameters:
futures.append(executor.submit(calc_stuff, parameter))
for future in futures:
out1, out2, out3 = future.result() # this will block
...
La sortie du contexte signale à l'exécuteur qu'il doit libérer des ressources.
Vous pouvez utiliser des threads ou des processus et utiliser exactement la même interface.
Un exemple concret
Voici un exemple de code fonctionnel qui démontre la valeur de :
Mettez cela dans un fichier - futuretest.py :
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from time import time
from http.client import HTTPSConnection
def processor_intensive(arg):
def fib(n): # recursive, processor intensive calculation (avoid n > 36)
return fib(n-1) + fib(n-2) if n > 1 else n
start = time()
result = fib(arg)
return time() - start, result
def io_bound(arg):
start = time()
con = HTTPSConnection(arg)
con.request('GET', '/')
result = con.getresponse().getcode()
return time() - start, result
def manager(PoolExecutor, calc_stuff):
if calc_stuff is io_bound:
inputs = ('python.org', 'stackoverflow.com', 'stackexchange.com',
'noaa.gov', 'parler.com', 'aaronhall.dev')
else:
inputs = range(25, 32)
timings, results = list(), list()
start = time()
with PoolExecutor() as executor:
for timing, result in executor.map(calc_stuff, inputs):
# put results into correct output list:
timings.append(timing), results.append(result)
finish = time()
print(f'{calc_stuff.__name__}, {PoolExecutor.__name__}')
print(f'wall time to execute: {finish-start}')
print(f'total of timings for each call: {sum(timings)}')
print(f'time saved by parallelizing: {sum(timings) - (finish-start)}')
print(dict(zip(inputs, results)), end = '\n\n')
def main():
for computation in (processor_intensive, io_bound):
for pool_executor in (ProcessPoolExecutor, ThreadPoolExecutor):
manager(pool_executor, calc_stuff=computation)
if __name__ == '__main__':
main()
Et voici la sortie d'une exécution de python -m futuretest
:
processor_intensive, ProcessPoolExecutor
wall time to execute: 0.7326343059539795
total of timings for each call: 1.8033506870269775
time saved by parallelizing: 1.070716381072998
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}
processor_intensive, ThreadPoolExecutor
wall time to execute: 1.190223217010498
total of timings for each call: 3.3561410903930664
time saved by parallelizing: 2.1659178733825684
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}
io_bound, ProcessPoolExecutor
wall time to execute: 0.533886194229126
total of timings for each call: 1.2977914810180664
time saved by parallelizing: 0.7639052867889404
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}
io_bound, ThreadPoolExecutor
wall time to execute: 0.38941240310668945
total of timings for each call: 1.6049387454986572
time saved by parallelizing: 1.2155263423919678
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}
Analyse à forte intensité de processeur
Lorsque vous effectuez des calculs intensifs en Python, attendez-vous à ce que la fonction ProcessPoolExecutor
pour être plus performant que le ThreadPoolExecutor
.
En raison du verrouillage global de l'interpréteur (alias le GIL), les threads ne peuvent pas utiliser plusieurs processeurs. Il faut donc s'attendre à ce que le temps de chaque calcul et le wall time (temps réel écoulé) soient plus importants.
Analyse liée aux OI
D'autre part, lors de l'exécution d'opérations liées à l'IO, attendez-vous à ce que ThreadPoolExecutor
pour être plus performant que ProcessPoolExecutor
.
Les threads de Python sont de vrais threads de système d'exploitation. Ils peuvent être mis en sommeil par le système d'exploitation et réveillés lorsque leurs informations arrivent.
Dernières réflexions
Je soupçonne que le multitraitement sera plus lent sous Windows, puisque Windows ne supporte pas la bifurcation et que chaque nouveau processus doit prendre du temps pour être lancé.
Vous pouvez imbriquer plusieurs threads à l'intérieur de plusieurs processus, mais il est recommandé de ne pas utiliser plusieurs threads pour lancer plusieurs processus.
Si vous êtes confronté à un problème de traitement lourd en Python, vous pouvez trivialement faire évoluer le système avec des processus supplémentaires - mais pas tellement avec le threading.
2 votes
Une solution très simple pour paralléliser une
for
n'est pas encore mentionnée comme une réponse - il s'agirait de décorer simplement deux fonctions en utilisant la fonctiondeco
paquet