83 votes

Existe-t-il une carte parallèle simple basée sur les processus pour python ?

Je suis à la recherche d'une carte parallèle simple basée sur les processus pour python, c'est-à-dire une fonction

parmap(function,[data])

qui exécuterait une fonction sur chaque élément de [data] sur un processus différent (enfin, sur un noyau différent, mais à ma connaissance, la seule façon d'exécuter des fonctions sur différents noyaux en python est de lancer plusieurs interpréteurs), et renverrait une liste de résultats.

Est-ce que quelque chose comme ça existe ? J'aimerais quelque chose simple Un module simple serait donc le bienvenu. Bien sûr, si une telle chose n'existe pas, je me contenterai d'une grande bibliothèque :-/

145voto

Flávio Amieiro Points 5872

J'ai l'impression que ce dont vous avez besoin est le Méthode map dans multiprocessing.Pool() :

map(func, iterable[, chunksize])

A parallel equivalent of the map() built-in function (it supports only
one iterable argument though). It blocks till the result is ready.

This method chops the iterable into a number of chunks which it submits to the 
process pool as separate tasks. The (approximate) size of these chunks can be 
specified by setting chunksize to a positive integ

Par exemple, si vous voulez mettre en correspondance cette fonction :

def f(x):
    return x**2

à la plage(10), vous pouvez le faire en utilisant la fonction intégrée map() :

map(f, range(10))

ou en utilisant la méthode map() d'un objet multiprocessing.Pool() :

import multiprocessing
pool = multiprocessing.Pool()
print pool.map(f, range(10))

8 votes

Si vous l'invoquez depuis un programme à longue durée de vie, assurez-vous d'appeler pool.close (idéalement dans le finally d'un bloc de fermeture try/finally ). Sinon, le pool peut échouer à nettoyer les processus enfants et vous pouvez vous retrouver avec des processus zombies. Voir bugs.python.org/issue19675

11 votes

@rogueleaderr Ne serait-il pas plus idiomatique d'utiliser with ?

3 votes

Bon point @CodeMonkey ! Le premier exemple sur le documents officiels utilise with donc ça devrait bien gérer le nettoyage.

14voto

Ion Stoica Points 487

Cela peut être fait de manière élégante avec Ray un système qui vous permet de paralléliser et de distribuer facilement votre code Python.

Pour paralléliser votre exemple, vous devrez définir votre fonction map avec la fonction @ray.remote et l'invoquer ensuite avec .remote . Ainsi, chaque instance de la fonction distante sera exécutée dans un processus différent.

import time
import ray

ray.init()

# Define the function you want to apply map on, as remote function. 
@ray.remote
def f(x):
    # Do some work...
    time.sleep(1)
    return x*x

# Define a helper parmap(f, list) function.
# This function executes a copy of f() on each element in "list".
# Each copy of f() runs in a different process.
# Note f.remote(x) returns a future of its result (i.e., 
# an identifier of the result) rather than the result itself.  
def parmap(f, list):
    return [f.remote(x) for x in list]

# Call parmap() on a list consisting of first 5 integers.
result_ids = parmap(f, range(1, 6))

# Get the results
results = ray.get(result_ids)
print(results)

Cela va s'imprimer :

[1, 4, 9, 16, 25]

et il se terminera dans environ len(list)/p (arrondi au nombre entier le plus proche) où p est le nombre de cœurs sur votre machine. En supposant une machine avec 2 cœurs, notre exemple s'exécutera en 5/2 arrondi vers le haut, c'est-à-dire en environ 3 sec.

Il y a un certain nombre d'avantages à utiliser Ray par rapport au multitraitement module. En particulier, le même code fonctionnera aussi bien sur une seule machine que sur un cluster de machines. Pour plus d'avantages de Ray, voir ce billet connexe .

12voto

bresson Points 19

La classe Pool de Python3 possède une méthode map() et c'est tout ce dont vous avez besoin pour paralléliser map :

from multiprocessing import Pool

with Pool() as P:
    xtransList = P.map(some_func, a_list)

Utilisation de with Pool() as P est similaire à un pool de processus et exécutera chaque élément de la liste en parallèle. Vous pouvez fournir le nombre de cœurs :

with Pool(processes=4) as P:

7voto

Good Will Points 411

Pour ceux qui cherchent l'équivalent Python de mclapply() de R, voici mon implémentation. Il s'agit d'une amélioration des deux exemples suivants :

Il peut être appliqué aux fonctions de mappage avec des arguments uniques ou multiples.

import numpy as np, pandas as pd
from scipy import sparse
import functools, multiprocessing
from multiprocessing import Pool

num_cores = multiprocessing.cpu_count()

def parallelize_dataframe(df, func, U=None, V=None):

    #blockSize = 5000
    num_partitions = 5 # int( np.ceil(df.shape[0]*(1.0/blockSize)) )
    blocks = np.array_split(df, num_partitions)

    pool = Pool(num_cores)
    if V is not None and U is not None:
        # apply func with multiple arguments to dataframe (i.e. involves multiple columns)
        df = pd.concat(pool.map(functools.partial(func, U=U, V=V), blocks))
    else:
        # apply func with one argument to dataframe (i.e. involves single column)
        df = pd.concat(pool.map(func, blocks))

    pool.close()
    pool.join()

    return df

def square(x):
    return x**2

def test_func(data):
    print("Process working on: ", data.shape)
    data["squareV"] = data["testV"].apply(square)
    return data

def vecProd(row, U, V):
    return np.sum( np.multiply(U[int(row["obsI"]),:], V[int(row["obsJ"]),:]) )

def mProd_func(data, U, V):
    data["predV"] = data.apply( lambda row: vecProd(row, U, V), axis=1 )
    return data

def generate_simulated_data():

    N, D, nnz, K = [302, 184, 5000, 5]
    I = np.random.choice(N, size=nnz, replace=True)
    J = np.random.choice(D, size=nnz, replace=True)
    vals = np.random.sample(nnz)

    sparseY = sparse.csc_matrix((vals, (I, J)), shape=[N, D])

    # Generate parameters U and V which could be used to reconstruct the matrix Y
    U = np.random.sample(N*K).reshape([N,K])
    V = np.random.sample(D*K).reshape([D,K])

    return sparseY, U, V

def main():
    Y, U, V = generate_simulated_data()

    # find row, column indices and obvseved values for sparse matrix Y
    (testI, testJ, testV) = sparse.find(Y)

    colNames = ["obsI", "obsJ", "testV", "predV", "squareV"]
    dtypes = {"obsI":int, "obsJ":int, "testV":float, "predV":float, "squareV": float}

    obsValDF = pd.DataFrame(np.zeros((len(testV), len(colNames))), columns=colNames)
    obsValDF["obsI"] = testI
    obsValDF["obsJ"] = testJ
    obsValDF["testV"] = testV
    obsValDF = obsValDF.astype(dtype=dtypes)

    print("Y.shape: {!s}, #obsVals: {}, obsValDF.shape: {!s}".format(Y.shape, len(testV), obsValDF.shape))

    # calculate the square of testVals    
    obsValDF = parallelize_dataframe(obsValDF, test_func)

    # reconstruct prediction of testVals using parameters U and V
    obsValDF = parallelize_dataframe(obsValDF, mProd_func, U, V)

    print("obsValDF.shape after reconstruction: {!s}".format(obsValDF.shape))
    print("First 5 elements of obsValDF:\n", obsValDF.iloc[:5,:])

if __name__ == '__main__':
    main()

3voto

Justin Winokur Points 21

Je sais que c'est un vieux post, mais juste au cas où, j'ai écrit un outil pour rendre cela super, super facile appelé parmapper (En fait, je l'appelle parmap dans mon utilisation mais le nom a été pris).

Il gère une grande partie de la configuration et de la déconstruction des processus et ajoute des tonnes de fonctionnalités. Par ordre d'importance

  • Peut prendre des fonctions lambda et d'autres fonctions indécrottables
  • Peut appliquer starmap et d'autres méthodes d'appel similaires pour le rendre très facile à utiliser directement.
  • Peut être réparti entre les deux threads et/ou processus
  • Comprend des fonctionnalités telles que les barres de progression

Cela entraîne un petit coût, mais pour la plupart des utilisations, c'est négligeable.

J'espère que vous le trouverez utile.

(Note : Il, comme map en Python 3+, renvoie un itérable, donc si vous vous attendez à ce que tous les résultats passent par lui immédiatement, utilisez list() )

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X