192 votes

Faire en sorte que Pandas DataFrame apply() utilise tous les cœurs ?

Depuis août 2017, les pandas DataFame.apply() est malheureusement toujours limité à un seul cœur, ce qui signifie qu'une machine à plusieurs cœurs gaspillera la majorité de son temps de calcul lorsque vous exécutez le programme df.apply(myfunc, axis=1) .

Comment utiliser tous vos cœurs pour exécuter une application sur un cadre de données en parallèle ?

161voto

slhck Points 8460

Vous pouvez utiliser le swifter paquet :

pip install swifter

(Notez que vous pouvez vouloir utiliser ceci dans un virtualenv pour éviter les conflits de version avec les dépendances installées).

Swifter fonctionne comme un plugin pour pandas, ce qui vous permet de réutiliser la fonction apply fonction :

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

Il trouvera automatiquement la manière la plus efficace de paralléliser la fonction, qu'elle soit vectorisée (comme dans l'exemple ci-dessus) ou non.

Autres exemples et un comparaison des performances sont disponibles sur GitHub. Notez que le paquet est en cours de développement actif, donc l'API peut changer.

Notez également que cette ne fonctionnera pas automatiquement pour les colonnes de type chaîne. Lors de l'utilisation de chaînes de caractères, Swifter se rabattra sur un "simple" Pandas apply qui ne seront pas parallèles. Dans ce cas, même en forçant l'utilisation de dask n'améliorera pas les performances, et vous feriez mieux de diviser votre ensemble de données manuellement et de paralléliser en utilisant multiprocessing .

3 votes

Par pure curiosité, existe-t-il un moyen de limiter le nombre de cœurs utilisés lors d'une application parallèle ? J'ai un serveur partagé, donc si je prends les 32 cœurs, personne ne sera content.

1 votes

@MaximHaytovich Je ne sais pas. Swifter utilise dask en arrière-plan, donc peut-être qu'il respecte ces paramètres : stackoverflow.com/a/40633117/435093 - Sinon, je vous recommande d'ouvrir un problème sur GitHub. L'auteur est très réactif.

0 votes

@slhck merci ! Je vais creuser un peu plus. Il semble ne pas fonctionner sur le serveur Windows de toute façon - il se bloque et ne fait rien sur la tâche du jouet.

138voto

Roko Mijic Points 1513

Le moyen le plus simple est d'utiliser map_partitions de Dask . Vous avez besoin de ces importations (vous devrez pip install dask ) :

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get

et la syntaxe est

data = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def myfunc(x,y,z, ...): return <whatever>

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)  

(Je pense que 30 est un nombre approprié de partitions si vous avez 16 cœurs). Juste pour être complet, j'ai chronométré la différence sur ma machine (16 cœurs) :

data = pd.DataFrame()
data['col1'] = np.random.normal(size = 1500000)
data['col2'] = np.random.normal(size = 1500000)

ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get)  
def vectorized(): return myfunc(data['col1'], data['col2']  )

t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))

28.16970546543598

t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1))

2.708152851089835

t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))

0.010668013244867325

Donner un Accélération d'un facteur 10 passer de pandas apply à dask apply sur les partitions. Bien sûr, si vous avez une fonction que vous pouvez vectoriser, vous devriez - dans ce cas, la fonction ( y*(x**2+1) ) est trivialement vectorisé, mais il y a beaucoup de choses qui sont impossibles à vectoriser.

2 votes

C'est bon à savoir, merci de le poster. Pouvez-vous expliquer pourquoi vous avez choisi 30 partitions ? Les performances changent-elles en changeant cette valeur ?

6 votes

@AndrewL Je suppose que chaque partition est gérée par un processus distinct, et avec 16 cœurs, je suppose que 16 ou 32 processus peuvent fonctionner simultanément. J'ai essayé, et les performances semblent s'améliorer jusqu'à 32 partitions, mais des augmentations supplémentaires n'ont aucun effet bénéfique. Je suppose qu'avec une machine à quatre cœurs, vous voudriez 8 partitions, etc. Notez que j'ai remarqué une certaine amélioration entre 16 et 32, donc je pense que vous voulez vraiment 2x$NUM_PROCESSORS.

17 votes

La seule chose est The get= keyword has been deprecated. Please use the scheduler= keyword instead with the name of the desired scheduler like 'threads' or 'processes'

48voto

G_KOBELIEF Points 91

Vous pouvez essayer pandarallel à la place : Un outil simple et efficace pour paralléliser vos opérations pandas sur tous vos CPUs (Sur Linux & macOS)

  • La parallélisation a un coût (instanciation de nouveaux processus, envoi de données via la mémoire partagée, etc ...), donc la parallélisation n'est efficace que si la quantité de calcul à paralléliser est suffisamment élevée. Pour une très petite quantité de données, utiliser la parallélisation ne vaut pas toujours la peine.
  • Les fonctions appliquées ne doivent PAS être des fonctions lambda.

    from pandarallel import pandarallel from math import sin

    pandarallel.initialize()

    FORBIDDEN

    df.parallel_apply(lambda x: sin(x**2), axis=1)

    ALLOWED

    def func(x): return sin(x**2)

    df.parallel_apply(func, axis=1)

voir https://github.com/nalepae/pandarallel

1 votes

Bonjour, je n'arrive pas à résoudre un problème, en utilisant pandarallel il y a une erreur : AttributeError : Can't pickle local object 'prepare_worker.<locals>.closure.<locals>.wrapper' . Pouvez-vous m'aider à résoudre ce problème ?

0 votes

@Alex Désolé, je ne suis pas le développeur de ce module. A quoi ressemble votre code ? Vous pouvez essayer de déclarer vos "fonctions internes" comme globales ? (juste une supposition)

0 votes

@AlexCam Votre fonction doit être définie en dehors d'une autre fonction afin que python puisse la récupérer pour le multitraitement.

48voto

Olivier_Cruchant Points 391

Si vous voulez rester en python natif :

import multiprocessing as mp

with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(f, df['col'])

appliquera la fonction f de façon parallèle à la colonne col de l'image de données df

0 votes

En suivant une telle approche, j'ai obtenu un ValueError: Length of values does not match length of index de __setitem__ en pandas/core/frame.py . Je ne sais pas si j'ai fait quelque chose de mal, ou si le fait d'assigner aux df['newcol'] n'est pas sécurisée.

2 votes

Vous pouvez écrire le pool.map dans une liste intermédiaire temp_result pour permettre de vérifier si la longueur correspond à celle du df, et ensuite faire un df['newcol'] = temp_result ?

0 votes

Vous voulez dire créer une nouvelle colonne ? qu'utiliseriez-vous ?

4voto

Maxim Balatsko Points 11

Voici un exemple de transformateur de base sklearn, dans lequel pandas apply est parallélisé

import multiprocessing as mp
from sklearn.base import TransformerMixin, BaseEstimator

class ParllelTransformer(BaseEstimator, TransformerMixin):
    def __init__(self,
                 n_jobs=1):
        """
        n_jobs - parallel jobs to run
        """
        self.variety = variety
        self.user_abbrevs = user_abbrevs
        self.n_jobs = n_jobs
    def fit(self, X, y=None):
        return self
    def transform(self, X, *_):
        X_copy = X.copy()
        cores = mp.cpu_count()
        partitions = 1

        if self.n_jobs <= -1:
            partitions = cores
        elif self.n_jobs <= 0:
            partitions = 1
        else:
            partitions = min(self.n_jobs, cores)

        if partitions == 1:
            # transform sequentially
            return X_copy.apply(self._transform_one)

        # splitting data into batches
        data_split = np.array_split(X_copy, partitions)

        pool = mp.Pool(cores)

        # Here reduce function - concationation of transformed batches
        data = pd.concat(
            pool.map(self._preprocess_part, data_split)
        )

        pool.close()
        pool.join()
        return data
    def _transform_part(self, df_part):
        return df_part.apply(self._transform_one)
    def _transform_one(self, line):
        # some kind of transformations here
        return line

Pour plus d'informations, voir https://towardsdatascience.com/4-easy-steps-to-improve-your-machine-learning-code-performance-88a0b0eeffa8

1 votes

Ce qui est : self._preprocess_part ? Je ne trouve que _transform_part

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X