19 votes

Pourquoi la lecture simultanée de plusieurs fichiers est-elle plus lente que la lecture séquentielle ?

J'essaie d'analyser plusieurs fichiers trouvés dans un répertoire, mais l'utilisation du multiprocessus ralentit mon programme.

# Calling my parsing function from Client.
L = getParsedFiles('/home/tony/Lab/slicedFiles') <--- 1000 .txt files found here.
                                                       combined ~100MB

En suivant cet exemple tiré de la documentation python :

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [1, 2, 3]))

J'ai écrit ce morceau de code :

from multiprocessing import Pool
from api.ttypes import *

import gc
import os

def _parse(pathToFile):
    myList = []
    with open(pathToFile) as f:
        for line in f:
            s = line.split()
            x, y = [int(v) for v in s]
            obj = CoresetPoint(x, y)
            gc.disable()
            myList.append(obj)
            gc.enable()
    return Points(myList)

def getParsedFiles(pathToFile):
    myList = []
    p = Pool(2)
    for filename in os.listdir(pathToFile):
        if filename.endswith(".txt"):
            myList.append(filename)
    return p.map(_pars, , myList)

J'ai suivi l'exemple, j'ai mis tous les noms des fichiers qui se terminent par un .txt dans une liste, puis j'ai créé des pools et je les ai associés à ma fonction. Je souhaite ensuite renvoyer une liste d'objets. Chaque objet contient les données analysées d'un fichier. Cependant, je suis étonné d'obtenir les résultats suivants :

#Pool 32  ---> ~162(s)
#Pool 16 ---> ~150(s)
#Pool 12 ---> ~142(s)
#Pool 2 ---> ~130(s)

Graphique :
enter image description here

Spécification de la machine :

62.8 GiB RAM
Intel® Core™ i7-6850K CPU @ 3.60GHz × 12   

Qu'est-ce qui m'échappe ?
Merci d'avance !

14voto

Peter Wood Points 4536

On dirait que vous êtes Liaison E/S :

En informatique, l'expression "I/O bound" fait référence à une situation dans laquelle le temps nécessaire pour effectuer un calcul est principalement déterminé par la période d'attente des opérations d'entrée/sortie. C'est l'inverse d'une tâche liée à l'unité centrale. Cette situation se produit lorsque la vitesse à laquelle les données sont demandées est plus lente que la vitesse à laquelle elles sont consommées ou, en d'autres termes, lorsque l'on passe plus de temps à demander des données qu'à les traiter.

Vous devrez probablement demander à votre thread principal d'effectuer la lecture et d'ajouter les données au pool lorsqu'un sous-processus devient disponible. Cela sera différent de l'utilisation de map .

Comme vous traitez une ligne à la fois et que les entrées sont divisées, vous pouvez utiliser fileinput pour itérer sur les lignes de plusieurs fichiers, et correspondre à une fonction traitant les lignes au lieu des fichiers :

Le passage d'une ligne à la fois peut être trop lent, nous pouvons donc demander à la carte de passer des morceaux, et nous pouvons ajuster jusqu'à ce que nous trouvions un point d'équilibre. Notre fonction analyse des morceaux de lignes :

def _parse_coreset_points(lines):
    return Points([_parse_coreset_point(line) for line in lines])

def _parse_coreset_point(line):
    s = line.split()
    x, y = [int(v) for v in s]
    return CoresetPoint(x, y)

Et notre fonction principale :

import fileinput

def getParsedFiles(directory):
    pool = Pool(2)

    txts = [filename for filename in os.listdir(directory):
            if filename.endswith(".txt")]

    return pool.imap(_parse_coreset_points, fileinput.input(txts), chunksize=100)

7voto

Danny_ds Points 5095

En général, ce n'est pas une bonne idée de lire simultanément sur le même disque dur physique (en rotation) à partir de plusieurs threads, car chaque commutation entraîne un délai supplémentaire d'environ 10 ms pour positionner la tête de lecture du disque dur (ce délai serait différent sur un disque SSD).

Comme @peter-wood l'a déjà dit, il est préférable qu'un thread lise les données et que d'autres threads les traitent.

Par ailleurs, pour tester réellement la différence, je pense que vous devriez effectuer le test avec des fichiers plus volumineux. Par exemple : les disques durs actuels devraient pouvoir lire environ 100MB/sec. Ainsi, la lecture des données d'un fichier de 100 Ko en une seule fois prendrait 1 ms, tandis que le positionnement de la tête de lecture au début de ce fichier prendrait 10 ms.

D'autre part, en regardant vos chiffres (en supposant qu'ils concernent une seule boucle), il est difficile de croire que le fait d'être limité en E/S est le seul problème ici. Le total des données est de 100 Mo, ce qui devrait prendre 1 seconde à lire depuis le disque plus quelques frais généraux, mais votre programme prend 130 secondes. Je ne sais pas si ce chiffre correspond aux fichiers froids sur le disque, ou à une moyenne de plusieurs tests où les données sont déjà mises en cache par le système d'exploitation (avec 62 Go de RAM, toutes ces données devraient être mises en cache la deuxième fois) - il serait intéressant de voir les deux chiffres.

Il doit donc y avoir autre chose. Regardons de plus près votre boucle :

for line in f:
    s = line.split()
    x, y = [int(v) for v in s]
    obj = CoresetPoint(x, y)
    gc.disable()
    myList.append(obj)
    gc.enable()

Bien que je ne connaisse pas Python, je pense que l'on peut dire que le gc Ce sont les appels qui posent problème. Ils sont appelés pour chaque ligne lue sur le disque. Je ne sais pas à quel point ces appels sont coûteux (ou ce qu'il en est si gc.enable() déclenche une collecte d'ordures par exemple) et pourquoi ils seraient nécessaires autour de append(obj) seulement, mais il peut y avoir d'autres problèmes parce qu'il s'agit de multithreading :

En supposant que le gc est global (c'est-à-dire qu'il n'est pas local à un thread), vous pourriez avoir quelque chose comme ceci :

thread 1 : gc.disable()
# switch to thread 2
thread 2 : gc.disable()
thread 2 : myList.append(obj)
thread 2 : gc.enable()
# gc now enabled!
# switch back to thread 1 (or one of the other threads)
thread 1 : myList.append(obj)
thread 1 : gc.enable()

Et si le nombre de threads <= nombre de cœurs, il n'y aurait même pas de commutation, ils l'appelleraient tous en même temps.

De même, si le gc est à l'abri des threads (ce serait pire s'il ne l'était pas), il devrait se verrouiller afin de modifier son état interne en toute sécurité, ce qui obligerait tous les autres threads à attendre.

Par exemple, gc.disable() ressemblerait à quelque chose comme ceci :

def disable()
    lock()  # all other threads are blocked for gc calls now
    alter internal data
    unlock()

Et parce que gc.disable() y gc.enable() sont appelées dans une boucle serrée, ce qui nuira aux performances lors de l'utilisation de plusieurs threads.

Il serait donc préférable de supprimer ces appels, ou de les placer au début et à la fin de votre programme s'ils sont vraiment nécessaires (ou de ne désactiver que les appels à l'aide de la fonction d'appel d'urgence). gc au début, il n'est pas nécessaire de faire gc juste avant de quitter le programme).

Selon la façon dont Python copie ou déplace les objets, il peut être légèrement préférable d'utiliser myList.append(CoresetPoint(x, y)) .

Il serait donc intéressant de tester la même chose sur un fichier de 100 Mo avec un seul thread et sans la fonction gc appels.

Si le traitement prend plus de temps que la lecture (c'est-à-dire s'il n'y a pas de contraintes d'E/S), utilisez un thread pour lire les données dans un tampon (cela devrait prendre 1 ou 2 secondes pour un fichier de 100 Mo s'il n'est pas déjà mis en cache), et plusieurs threads pour traiter les données (mais toujours sans ces contraintes d'E/S). gc dans cette boucle serrée).

Il n'est pas nécessaire de diviser les données en plusieurs fichiers pour pouvoir utiliser les threads. Il suffit de les laisser traiter différentes parties du même fichier (même avec le fichier de 14 Go).

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X