J'ai consulté de nombreuses réponses sur stack overflow et sur le web en essayant de mettre en place un moyen de faire du multitraitement en utilisant des files d'attente pour faire passer de grands cadres de données pandas. Il m'a semblé que toutes les réponses répétaient le même type de solutions sans tenir compte de la multitude de cas limites que l'on peut rencontrer lors de la mise en place de ce type de calculs. Le problème est qu'il y a beaucoup de choses en jeu en même temps. Le nombre de tâches, le nombre de travailleurs, la durée de chaque tâche et les exceptions possibles pendant l'exécution de la tâche. Tous ces éléments rendent la synchronisation délicate et la plupart des réponses n'indiquent pas comment s'y prendre. Voici donc mon point de vue après avoir bricolé pendant quelques heures, en espérant qu'il sera suffisamment générique pour que la plupart des gens le trouvent utile.
Quelques réflexions avant tout exemple de codage. Depuis queue.Empty
o queue.qsize()
ou toute autre méthode similaire n'est pas fiable pour le contrôle de flux, tout code de ce type
while True:
try:
task = pending_queue.get_nowait()
except queue.Empty:
break
est bidon. Cela tuera le travailleur même si, quelques millisecondes plus tard, une autre tâche apparaît dans la file d'attente. Le travailleur ne s'en remettra pas et, après un certain temps, TOUS les travailleurs disparaîtront car ils trouveront au hasard la file d'attente momentanément vide. Le résultat final sera que la fonction principale de multiprocessing (celle avec le join() sur les processus) reviendra sans que toutes les tâches soient terminées. Sympa. Bonne chance pour déboguer cela si vous avez des milliers de tâches et que quelques-unes sont manquantes.
L'autre problème est l'utilisation de valeurs sentinelles. De nombreuses personnes ont suggéré d'ajouter une valeur sentinelle dans la file d'attente pour signaler la fin de la file. Mais pour le signaler à qui exactement ? S'il y a N travailleurs, en supposant que N est le nombre de cœurs disponibles, alors une seule valeur sentinelle ne signalera la fin de la file d'attente qu'à un seul travailleur. Tous les autres travailleurs resteront en attente d'un travail supplémentaire alors qu'il n'y en a plus. Les exemples typiques que j'ai vus sont
while True:
task = pending_queue.get()
if task == SOME_SENTINEL_VALUE:
break
Un travailleur obtiendra la valeur sentinelle tandis que les autres attendront indéfiniment. Aucun article que j'ai trouvé ne mentionne que vous devez soumettre la valeur sentinelle à la file d'attente AU MOINS autant de fois que vous avez de travailleurs afin que TOUS les travailleurs l'obtiennent.
L'autre problème est le traitement des exceptions pendant l'exécution des tâches. Là encore, il faut les attraper et les gérer. De plus, si vous avez un completed_tasks
vous devez compter de manière indépendante et déterministe combien d'éléments se trouvent dans la file d'attente avant de décider que le travail est terminé. Encore une fois, se fier à la taille des files d'attente est voué à l'échec et donne des résultats inattendus.
Dans l'exemple ci-dessous, le par_proc()
recevra une liste de tâches comprenant les fonctions avec lesquelles ces tâches doivent être exécutées, ainsi que les arguments et valeurs nommés.
import multiprocessing as mp
import dill as pickle
import queue
import time
import psutil
SENTINEL = None
def do_work(tasks_pending, tasks_completed):
# Get the current worker's name
worker_name = mp.current_process().name
while True:
try:
task = tasks_pending.get_nowait()
except queue.Empty:
print(worker_name + ' found an empty queue. Sleeping for a while before checking again...')
time.sleep(0.01)
else:
try:
if task == SENTINEL:
print(worker_name + ' no more work left to be done. Exiting...')
break
print(worker_name + ' received some work... ')
time_start = time.perf_counter()
work_func = pickle.loads(task['func'])
result = work_func(**task['task'])
tasks_completed.put({work_func.__name__: result})
time_end = time.perf_counter() - time_start
print(worker_name + ' done in {} seconds'.format(round(time_end, 5)))
except Exception as e:
print(worker_name + ' task failed. ' + str(e))
tasks_completed.put({work_func.__name__: None})
def par_proc(job_list, num_cpus=None):
# Get the number of cores
if not num_cpus:
num_cpus = psutil.cpu_count(logical=False)
print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))
# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()
# Gather processes and results here
processes = []
results = []
# Count tasks
num_tasks = 0
# Add the tasks to the queue
for job in job_list:
for task in job['tasks']:
expanded_job = {}
num_tasks = num_tasks + 1
expanded_job.update({'func': pickle.dumps(job['func'])})
expanded_job.update({'task': task})
tasks_pending.put(expanded_job)
# Use as many workers as there are cores (usually chokes the system so better use less)
num_workers = num_cpus
# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
tasks_pending.put(SENTINEL)
print('* Number of tasks: {}'.format(num_tasks))
# Set-up and start the workers
for c in range(num_workers):
p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed))
p.name = 'worker' + str(c)
processes.append(p)
p.start()
# Gather the results
completed_tasks_counter = 0
while completed_tasks_counter < num_tasks:
results.append(tasks_completed.get())
completed_tasks_counter = completed_tasks_counter + 1
for p in processes:
p.join()
return results
Et voici un test pour exécuter le code ci-dessus contre
def test_parallel_processing():
def heavy_duty1(arg1, arg2, arg3):
return arg1 + arg2 + arg3
def heavy_duty2(arg1, arg2, arg3):
return arg1 * arg2 * arg3
task_list = [
{'func': heavy_duty1, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
{'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
]
results = par_proc(task_list)
job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])
assert job1 == 15
assert job2 == 21
plus un autre avec quelques exceptions
def test_parallel_processing_exceptions():
def heavy_duty1_raises(arg1, arg2, arg3):
raise ValueError('Exception raised')
return arg1 + arg2 + arg3
def heavy_duty2(arg1, arg2, arg3):
return arg1 * arg2 * arg3
task_list = [
{'func': heavy_duty1_raises, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
{'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
]
results = par_proc(task_list)
job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])
assert not job1
assert job2 == 21
J'espère que cela vous aidera.