178 votes

Comment exécuter des fonctions en parallèle ?

J'ai d'abord fait des recherches et je n'ai pas trouvé de réponse à ma question. J'essaie d'exécuter plusieurs fonctions en parallèle dans Python.

J'ai quelque chose comme ça :

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

Je veux appeler func1 et func2 et les faire fonctionner en même temps. Les fonctions n'interagissent pas entre elles ni sur le même objet. Pour l'instant, je dois attendre que func1 se termine avant que func2 ne démarre. Comment puis-je faire quelque chose comme ci-dessous :

process.py

from files import func1, func2

runBothFunc(func1(), func2())

Je veux pouvoir créer les deux répertoires à peu près en même temps, car je compte chaque minute le nombre de fichiers créés. Si le répertoire n'est pas là, cela perturbera mon timing.

255voto

NPE Points 169956

Vous pouvez utiliser threading o multiprocessing .

En raison de particularités de CPython , threading a peu de chances d'aboutir à un véritable parallélisme. C'est la raison pour laquelle, multiprocessing est généralement un meilleur pari.

Voici un exemple complet :

from multiprocessing import Process

def func1():
    print("func1: starting")
    for i in range(10000000):
        pass

    print("func1: finishing")

def func2():
    print("func2: starting")
    for i in range(10000000):
        pass

    print("func2: finishing")

if __name__ == "__main__":
    p1 = Process(target=func1)
    p1.start()
    p2 = Process(target=func2)
    p2.start()
    p1.join()
    p2.join()

La mécanique de démarrage/jonction des processus enfants peut facilement être encapsulée dans une fonction du type de la fonction runBothFunc :

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)

49voto

David Foster Points 693

Si vos fonctions consistent principalement à Travail E/S (et moins de travail CPU) et que vous avez Python 3.2+, vous pouvez utiliser un fichier ThreadPoolExecutor :

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])

Si vos fonctions consistent principalement à Travail de l'unité centrale (et moins de travail d'E/S) et que vous avez Python 2.6+, vous pouvez utiliser l'option multiprocessing module :

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

run_cpu_tasks_in_parallel([
    lambda: print('CPU task 1 running!'),
    lambda: print('CPU task 2 running!'),
])

28voto

Ion Stoica Points 487

Cela peut être fait de manière élégante avec Ray un système qui vous permet de paralléliser et de distribuer facilement votre code Python.

Pour paralléliser votre exemple, vous devez définir vos fonctions avec l'option @ray.remote puis de les invoquer avec le décorateur .remote .

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

Si vous transmettez le même argument aux deux fonctions et que cet argument est de grande taille, il est plus efficace d'utiliser la fonction ray.put() . Cela évite de sérialiser deux fois le grand argument et d'en créer deux copies en mémoire :

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

Important - Si func1() y func2() renvoie des résultats, vous devez réécrire le code comme suit :

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func2.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

L'utilisation de Ray présente un certain nombre d'avantages par rapport à l'utilisation de l'Internet. multiprocessing module. En particulier, le module même code fonctionnera aussi bien sur une seule machine que sur une grappe de machines. Pour plus d'avantages de Ray, voir ce billet connexe .

17voto

BICube Points 2157

Il semble que vous ayez une seule fonction que vous devez appeler avec deux paramètres différents. Cela peut être fait de manière élégante en utilisant une combinaison de concurrent.futures y map avec Python 3.2+

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def sleep_secs(seconds):
  time.sleep(seconds)
  print(f'{seconds} has been processed')

secs_list = [2,4, 6, 8, 10, 12]

Maintenant, si votre opération est liée aux entrées-sorties, vous pouvez utiliser la fonction ThreadPoolExecutor comme tel :

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

Notez comment map est utilisé ici pour map votre fonction à la liste des arguments.

Maintenant, si votre fonction est liée au processeur, vous pouvez utiliser ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

Si vous n'êtes pas sûr, vous pouvez simplement essayer les deux et voir lequel vous donne les meilleurs résultats.

Enfin, si vous souhaitez imprimer vos résultats, il vous suffit de le faire :

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)
  for result in results:
    print(result)

8voto

bruzzo Points 92

En 2021, la façon la plus simple est d'utiliser asyncio :

import asyncio, time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():

    task1 = asyncio.create_task(
        say_after(4, 'hello'))

    task2 = asyncio.create_task(
        say_after(3, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

Références :

[1] https://docs.python.org/3/library/asyncio-task.html

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X