112 votes

Transmettre plusieurs paramètres à concurrent.futures.Executor.map?

Le concurrent.futures.Executor.map prend un nombre variable d'itérables à partir desquels la fonction donnée est appelée. Comment dois-je l'appeler si j'ai un générateur qui produit des tuples qui sont normalement déballés sur place?

Le code suivant ne fonctionne pas car chaque tuple généré est donné comme un argument différent à map:

args = ((a, b) for (a, b) in c)
for result in executor.map(f, *args):
    pass

Sans le générateur, les arguments souhaités pour map pourraient ressembler à ceci:

executor.map(
    f,
    (i[0] for i in args),
    (i[1] for i in args),
    ...,
    (i[N] for i in args),
)

3voto

Tengerye Points 161

Pour ProcessPoolExecutor.map():

Similaire à map(func, *iterables) sauf que :

les iterables sont collectés immédiatement plutôt que de manière paresseuse

func est exécuté de manière asynchrone et plusieurs appels à func peuvent être faits concurrentiellement.

Par conséquent, l'utilisation de ProcessPoolExecutor.map() est la même que celle de la fonction intégrée map() de Python. Voici la documentation:

Retourne un itérateur qui applique la fonction à chaque élément de l'itérable, renvoyant les résultats. Si des arguments itérables supplémentaires sont passés, la fonction doit prendre autant d'arguments et est appliquée aux éléments de tous les itérables en parallèle.

Conclusion : passer plusieurs paramètres à map().

Essayez d'exécuter le code suivant sous python 3, et vous comprendrez bien:

from concurrent.futures import ProcessPoolExecutor

def f(a, b):
    print(a+b)

with ProcessPoolExecutor() as pool:
    pool.map(f, (0, 1, 2, 3, 4, 5, 6, 7, 8, 9), (0, 1, 2))

# 0, 2, 4

array = [(i, i) for i in range(3)]
with ProcessPoolExecutor() as pool:
    pool.map(f, *zip(*array))

# 0, 2, 4

2voto

H L Points 11

J'ai vu tant de réponses ici, mais aucune n'est aussi directe que l'utilisation d'expressions lambda :

foo(x, y): pass

veux-tu appeler la méthode ci-dessus 10 fois, avec la même valeur c'est-à-dire xVal et yVal ? avec concurrent.futures.ThreadPoolExecutor() as executor:

for _ in executor.map( lambda _: foo(xVal, yVal), range(0, 10)):
    pass

2voto

Vaibhav K Points 368

Supposons que vous avez des données comme celles-ci dans le tableau de données ci-dessous et que vous souhaitez passer les deux premières colonnes à une fonction qui lira les images, prédira les caractéristiques, calculera la différence et renverra la valeur de la différence.

Note : vous pouvez avoir n'importe quel scénario selon vos besoins et, en conséquence, vous pouvez définir la fonction.

Le code ci-dessous prendra ces deux colonnes comme arguments et les passera au mécanisme du Threadpool (affichant également la barre de progression).

enter image description here

''' fonction qui donnera la différence de deux matrices de caractéristiques numpy '''
def getDifference(image_1_loc, image_2_loc, esp=1e-7):
       arr1 = ''' lire la 1ère image et extraire les caractéristiques '''
       arr2 = ''' lire la 2ème image et extraire les caractéristiques '''
       diff = arr1.ravel() - arr2.ravel() + esp    
       return diff

''' Utilisation de ThreadPoolExecutor à partir de concurrent.futures avec plusieurs arguments '''

with ThreadPoolExecutor() as executor:
        result = np.array(
                         list(tqdm(
                                   executor.map(lambda x : function(*x), [(i,j) for i,j in df[['image_1','image_2']].values]),
                               total=len(df)
                                  ) 
                             )
                          )

enter image description here

-1voto

shanu khera Points 168

Un utilitaire simple que j'utilise tout le temps est ci-dessous.

########### Début du code utilitaire ###########

import os
import sys
import traceback

from concurrent import futures
from functools import partial

def catch(fn):
    def wrap(*args, **kwargs):
        result = None
        try:
            result = fn(*args, **kwargs)
        except Exception as err:
            type_, value_, traceback_ = sys.exc_info()
            return None, (
                args,
                "".join(traceback.format_exception(type_, value_, traceback_)),
            )
        else:
            return result, (args, None)

    return wrap

def top_level_wrap(fn, arg_tuple):
    args, kwargs = arg_tuple
    return fn(*args, *kwargs)

def create_processes(fn, values, handle_error, handle_success):
    cores = os.cpu_count()
    max_workers = 2 * cores + 1

    to_exec = partial(top_level_wrap, fn)

    with futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
        for result, error in executor.map(to_exec, values):
            args, tb = error
            if tb is not None:
                handle_error(args, tb)
            else:
                handle_success(result)

########### Fin du code utilitaire ###########

Exemple d'utilisation -

######### Début de l'exemple d'utilisation ###########

import time

@catch
def fail_when_5(val):
    time.sleep(val)
    if val == 5:
        raise Exception("Erreur - val était 5")
    else:
        return f"Aucune erreur, val est {val}"

def handle_error(args, tb):
    print("args is", args)
    print("TB is", tb)

def top_level(val, val_2, test=None, test2="ok"):
    print(val_2, test, test2)
    return fail_when_5(val)

handle_success = print

if __name__ == "__main__":
    # FORME -> ( (args, kwargs), (args, kwargs), ... )
    values = tuple(
        ((x, x + 1), {"test": f"t_{x+2}", "test2": f"t_{x+3}"}) for x in range(10)
    )
    create_processes(top_level, values, handle_error, handle_success)

######### Fin de l'exemple d'utilisation ###########

-1voto

Cela fonctionne pour moi:

from concurrent.futures import ThreadPoolExecutor

def fonction_concurrente(fonction, liste):
  with ThreadPoolExecutor() as executeur:
    executeur.map(fonction, liste)

def multiplication_concurrente(args = {'a': 1, 'b': 2}):
  print(args['a']*args['b'])

fonction_concurrente(multiplication, [{'a': 1, 'b': 1}, 
                               {'a': 2, 'b': 2}, 
                               {'a': 3, 'b': 3}])

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X