106 votes

Comment effectuer une opération à l'intérieur d'une boucle en Python ?

Supposons que je dispose d'une très grande liste et que j'effectue une opération de ce type :

for item in items:
    try:
        api.my_operation(item)
    except:
        print 'error with item'

Mon problème est double :

  • Il y a beaucoup d'articles
  • api.my_operation met une éternité à revenir

J'aimerais utiliser le multithreading pour lancer plusieurs api.my_operations en même temps afin de pouvoir traiter 5, 10 ou même 100 éléments à la fois.

Si la fonction my_operation() renvoie une exception (parce que j'ai peut-être déjà traité cet élément), ce n'est pas grave. Cela n'interrompt rien. La boucle peut continuer jusqu'à l'élément suivant.

Note : ceci est pour Python 2.7.3

192voto

abarnert Points 94246

Tout d'abord, en Python, si votre code est lié au processeur, le multithreading n'est d'aucune utilité, car un seul thread peut détenir le Global Interpreter Lock, et donc exécuter le code Python, à la fois. Vous devez donc utiliser des processus, et non des threads.

Ce n'est pas le cas si votre opération "met une éternité à revenir" parce qu'elle est liée aux entrées-sorties, c'est-à-dire qu'elle attend sur le réseau ou sur des copies de disque, etc. Je reviendrai sur ce point plus tard.


Ensuite, pour traiter 5, 10 ou 100 articles à la fois, il faut créer un pool de 5, 10 ou 100 travailleurs et placer les articles dans une file d'attente que les travailleurs traitent. Heureusement, la bibliothèque stdlib multiprocessing y concurrent.futures Les deux bibliothèques vous donnent la plupart des détails.

Le premier est plus puissant et plus flexible pour la programmation traditionnelle ; le second est plus simple si vous avez besoin de composer avec l'attente future ; pour les cas triviaux, le choix n'a pas vraiment d'importance. (Dans ce cas, l'implémentation la plus évidente prend 3 lignes avec futures , 4 lignes avec multiprocessing .)

Si vous utilisez 2.6-2.7 ou 3.0-3.1, futures n'est pas intégré, mais vous pouvez l'installer à partir de PyPI ( pip install futures ).


Enfin, il est généralement beaucoup plus simple de paralléliser les choses si vous pouvez transformer l'itération entière de la boucle en un appel de fonction (quelque chose que vous pouvez, par exemple, passer à map ), alors commençons par cela :

def try_my_operation(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')

La mise en place de l'ensemble :

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_my_operation, item) for item in items]
concurrent.futures.wait(futures)

Si vous avez beaucoup de travaux relativement petits, les frais généraux du multiprocessing risquent d'annihiler les avantages. La solution consiste à regrouper les travaux en lots plus importants. Par exemple (en utilisant grouper de la itertools recettes que vous pouvez copier et coller dans votre code, ou que vous pouvez obtenir à partir du fichier more-itertools sur PyPI) :

def try_multiple_operations(items):
    for item in items:
        try:
            api.my_operation(item)
        except:
            print('error with item')

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_multiple_operations, group) 
           for group in grouper(5, items)]
concurrent.futures.wait(futures)

Enfin, que se passe-t-il si votre code est lié aux entrées-sorties ? Les threads sont alors tout aussi efficaces que les processus, mais avec moins de surcharge (et moins de limitations, mais ces limitations ne vous affectent généralement pas dans des cas comme celui-ci). Parfois, cette "moindre surcharge" est suffisante pour que vous n'ayez pas besoin de batching avec les threads, mais que vous en ayez besoin avec les processus, ce qui est une bonne chose.

Comment utiliser les threads au lieu des processus ? Il suffit de modifier ProcessPoolExecutor a ThreadPoolExecutor .

Si vous n'êtes pas sûr de savoir si votre code est lié au processeur ou aux entrées-sorties, essayez-le dans les deux sens.


Est-ce que je peux faire cela pour plusieurs fonctions dans mon script en python ? Par exemple, si j'ai une autre boucle for ailleurs dans le code que je veux paralléliser. Est-il possible de faire deux fonctions multi threads dans le même script ?

Oui. En fait, il y a deux façons différentes de procéder.

Premièrement, vous pouvez partager le même exécuteur (thread ou processus) et l'utiliser à partir de plusieurs endroits sans aucun problème. L'intérêt des tâches et des contrats à terme est qu'ils sont autonomes ; vous ne vous souciez pas de l'endroit où ils s'exécutent, mais seulement du fait que vous les mettez en file d'attente et que vous finissez par obtenir la réponse.

Il est également possible d'avoir deux exécuteurs dans le même programme sans aucun problème. Cela a un coût en termes de performances : si vous utilisez les deux exécuteurs en même temps, vous finirez par essayer d'exécuter (par exemple) 16 threads occupés sur 8 cœurs, ce qui signifie qu'il y aura des changements de contexte. Mais parfois, cela vaut la peine de le faire parce que, par exemple, les deux exécuteurs sont rarement occupés en même temps, et cela rend votre code beaucoup plus simple. Ou peut-être qu'un exécuteur exécute des tâches très importantes qui peuvent prendre un certain temps, et que l'autre exécute des tâches très petites qui doivent être terminées le plus rapidement possible, parce que la réactivité est plus importante que le débit pour une partie de votre programme.

Si vous ne savez pas lequel est approprié pour votre programme, c'est généralement le premier.

61voto

woozyking Points 836

Il y a multiprocessing.pool, et l'exemple suivant illustre comment utiliser l'un d'entre eux :

from multiprocessing.pool import ThreadPool as Pool
# from multiprocessing import Pool

pool_size = 5  # your "parallelness"

# define worker function before a Pool is instantiated
def worker(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')

pool = Pool(pool_size)

for item in items:
    pool.apply_async(worker, (item,))

pool.close()
pool.join()

Maintenant, si vous identifiez que votre processus est lié au CPU comme @abarnert l'a mentionné, changez ThreadPool pour l'implémentation du pool de processus (commenté sous l'importation de ThreadPool). Vous pouvez trouver plus de détails ici : http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers

24voto

Ryan Haining Points 5355

Vous pouvez diviser le traitement en un nombre déterminé de threads en utilisant une approche comme celle-ci :

import threading                                                                

def process(items, start, end):                                                 
    for item in items[start:end]:                                               
        try:                                                                    
            api.my_operation(item)                                              
        except Exception:                                                       
            print('error with item')                                            

def split_processing(items, num_splits=4):                                      
    split_size = len(items) // num_splits                                       
    threads = []                                                                
    for i in range(num_splits):                                                 
        # determine the indices of the list this thread will handle             
        start = i * split_size                                                  
        # special case on the last chunk to account for uneven splits           
        end = None if i+1 == num_splits else (i+1) * split_size                 
        # create the thread                                                     
        threads.append(                                                         
            threading.Thread(target=process, args=(items, start, end)))         
        threads[-1].start() # start the thread we just created                  

    # wait for all threads to finish                                            
    for t in threads:                                                           
        t.join()                                                                

split_processing(items)

11voto

Vinoj John Hosan Points 160
import numpy as np
import threading

def threaded_process(items_chunk):
    """ Your main process which runs in thread for each chunk"""
    for item in items_chunk:                                               
        try:                                                                    
            api.my_operation(item)                                              
        except Exception:                                                       
            print('error with item')  

n_threads = 20
# Splitting the items into chunks equal to number of threads
array_chunk = np.array_split(input_image_list, n_threads)
thread_list = []
for thr in range(n_threads):
    thread = threading.Thread(target=threaded_process, args=(array_chunk[thr]),)
    thread_list.append(thread)
    thread_list[thr].start()

for thread in thread_list:
    thread.join()

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X