2 votes

Performances lentes de Dask lors de l'analyse d'une date CSV ?

J'ai fait beaucoup de traitement de texte sur une grosse pile de fichiers, y compris de gros CSV et de nombreux petits fichiers XML. Parfois, je fais des comptages globaux, mais la plupart du temps, je fais un travail de type NLP pour approfondir le contenu de ces fichiers au-delà de ce qui est étiqueté ou déjà structuré.

J'ai beaucoup utilisé la bibliothèque multiprocessing pour effectuer ces calculs sur plusieurs CPU, mais je suis tombé amoureux des idées derrière Dask et il est fortement recommandé à la fois sur le net et par les collègues de travail.

J'ai posé une question similaire sur les performances de Dask ici :

Lenteur des performances avec le sac Python Dask ?

et MRocklin ( https://stackoverflow.com/users/616616/mrocklin ) m'a fait savoir que le chargement d'un grand nombre de petits fichiers risquait de nuire aux performances.

Pourtant, lorsque je l'exécute sur de gros fichiers (200 Mo), je n'obtiens toujours pas de très bons résultats. Voici un exemple :

J'ai un fichier CSV de 900 000 lignes de tweets et je veux le charger rapidement et analyser le champ "created_at". Voici trois façons de procéder et les références pour chacune d'entre elles. J'ai utilisé un nouveau MacBook Pro i7 2016 avec 16 Go de mémoire vive.

import pandas
import dask.dataframe as dd
import multiprocessing

%%time
# Single Threaded, no chunking
d = pandas.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", parse_dates = ["created_at"])
print(len(d))

Temps CPU : utilisateur 2min 31s, sys : 807 ms, total : 2min 32s Durée du mur : 2min 32s

%%time
# Multithreaded chunking
def parse_frame_dates(frame):
    frame["created_at"] = pandas.to_datetime(frame["created_at"])
    return(frame)

d = pandas.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", chunksize = 100000)
frames = multiprocessing.Pool().imap_unordered(get_count, d)
td = pandas.concat(frames)
print(len(td))

Temps CPU : user 5.65 s, sys : 1.47 s, total : 7.12 s Durée du mur : 1min 10s

%%time
# Dask Load
d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", 
                 parse_dates = ["created_at"], blocksize = 10000000).compute()

Temps CPU : utilisateur 2min 59s, sys : 26.2 s, total : 3min 25s Temps du mur : 3min 12s

J'ai trouvé ce genre de résultats dans de nombreuses comparaisons de Dask, mais même si je parviens à faire fonctionner ce système correctement, cela pourrait m'orienter dans la bonne direction.

En bref, comment puis-je obtenir les meilleures performances de Dask pour ce type de tâches ? Pourquoi les performances de Dask semblent-elles inférieures à celles des techniques monotâches et multithâches utilisées dans d'autres contextes ?

2voto

MRocklin Points 2855

Je pense que le code Pandas read_csv datetime parsing est purement Python, et ne bénéficiera donc pas beaucoup de l'utilisation de threads, ce que dask.dataframe utilise par défaut.

L'utilisation de processus peut s'avérer plus performante.

Je pense que la méthode suivante serait plus rapide :

import dask.multiprocessing
dask.set_options(get=dask.multiprocessing.get)  # set processes as default

d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", 
                parse_dates = ["created_at"], blocksize = 10000000)
len(d)

Le problème avec les processus est que la communication inter-processus peut devenir coûteuse. Je calcule explicitement len(d) ci-dessus plutôt que d.compute() afin d'éviter d'avoir à ramasser toutes les images de données pandas dans les processus de travail et de les déplacer vers le processus d'appel principal. Dans la pratique, cela est assez courant, car les gens veulent rarement l'image de données complète, mais plutôt un calcul sur l'image de données.

La page du document en question est la suivante http://dask.readthedocs.io/en/latest/scheduler-choice.html

Vous pouvez également utiliser la fonction planificateur distribué sur une seule machine plutôt que d'utiliser le planificateur multiprocessus. Cette procédure est également décrite dans la documentation mentionnée ci-dessus.

$ pip install dask distributed

from dask.distributed import Client
c = Client()  # create processes and set as default

d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", 
                parse_dates = ["created_at"], blocksize = 10000000)
len(d)

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X