J'ai fait beaucoup de traitement de texte sur une grosse pile de fichiers, y compris de gros CSV et de nombreux petits fichiers XML. Parfois, je fais des comptages globaux, mais la plupart du temps, je fais un travail de type NLP pour approfondir le contenu de ces fichiers au-delà de ce qui est étiqueté ou déjà structuré.
J'ai beaucoup utilisé la bibliothèque multiprocessing pour effectuer ces calculs sur plusieurs CPU, mais je suis tombé amoureux des idées derrière Dask et il est fortement recommandé à la fois sur le net et par les collègues de travail.
J'ai posé une question similaire sur les performances de Dask ici :
Lenteur des performances avec le sac Python Dask ?
et MRocklin ( https://stackoverflow.com/users/616616/mrocklin ) m'a fait savoir que le chargement d'un grand nombre de petits fichiers risquait de nuire aux performances.
Pourtant, lorsque je l'exécute sur de gros fichiers (200 Mo), je n'obtiens toujours pas de très bons résultats. Voici un exemple :
J'ai un fichier CSV de 900 000 lignes de tweets et je veux le charger rapidement et analyser le champ "created_at". Voici trois façons de procéder et les références pour chacune d'entre elles. J'ai utilisé un nouveau MacBook Pro i7 2016 avec 16 Go de mémoire vive.
import pandas
import dask.dataframe as dd
import multiprocessing
%%time
# Single Threaded, no chunking
d = pandas.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", parse_dates = ["created_at"])
print(len(d))
Temps CPU : utilisateur 2min 31s, sys : 807 ms, total : 2min 32s Durée du mur : 2min 32s
%%time
# Multithreaded chunking
def parse_frame_dates(frame):
frame["created_at"] = pandas.to_datetime(frame["created_at"])
return(frame)
d = pandas.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", chunksize = 100000)
frames = multiprocessing.Pool().imap_unordered(get_count, d)
td = pandas.concat(frames)
print(len(td))
Temps CPU : user 5.65 s, sys : 1.47 s, total : 7.12 s Durée du mur : 1min 10s
%%time
# Dask Load
d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv",
parse_dates = ["created_at"], blocksize = 10000000).compute()
Temps CPU : utilisateur 2min 59s, sys : 26.2 s, total : 3min 25s Temps du mur : 3min 12s
J'ai trouvé ce genre de résultats dans de nombreuses comparaisons de Dask, mais même si je parviens à faire fonctionner ce système correctement, cela pourrait m'orienter dans la bonne direction.
En bref, comment puis-je obtenir les meilleures performances de Dask pour ce type de tâches ? Pourquoi les performances de Dask semblent-elles inférieures à celles des techniques monotâches et multithâches utilisées dans d'autres contextes ?