87 votes

Traitement d'un seul fichier à partir de plusieurs processus

J'ai un seul gros fichier texte dans lequel je veux traiter chaque ligne (faire quelques opérations) et les stocker dans une base de données. Comme un seul programme simple prend trop de temps, je veux que cela soit fait via plusieurs processus ou threads. Chaque thread/processus doit lire les DIFFERENTES données (différentes lignes) de ce fichier unique et faire quelques opérations sur leur morceau de données (lignes) et les mettre dans la base de données de sorte qu'à la fin, j'ai la totalité des données traitées et ma base de données est vidée avec les données dont j'ai besoin.

Mais je ne parviens pas à comprendre comment aborder cette question.

115voto

jdi Points 38029

Ce que vous recherchez, c'est un modèle de producteur/consommateur.

Exemple de filetage de base

Voici un exemple de base utilisant le module d'enfilage (au lieu du multiprocessing)

import threading
import Queue
import sys

def do_work(in_queue, out_queue):
    while True:
        item = in_queue.get()
        # process
        result = item
        out_queue.put(result)
        in_queue.task_done()

if __name__ == "__main__":
    work = Queue.Queue()
    results = Queue.Queue()
    total = 20

    # start for workers
    for i in xrange(4):
        t = threading.Thread(target=do_work, args=(work, results))
        t.daemon = True
        t.start()

    # produce data
    for i in xrange(total):
        work.put(i)

    work.join()

    # get the results
    for i in xrange(total):
        print results.get()

    sys.exit()

Vous ne partageriez pas l'objet fichier avec les fils. Vous produisez du travail pour eux en fournissant l'objet file d'attente avec des lignes de données. Ensuite, chaque thread prendrait une ligne, la traiterait, puis la renverrait dans la file d'attente.

Il existe des fonctions plus avancées intégrées dans le module multiprocesseur pour partager des données, comme des listes et des un type particulier de file d'attente . Il y a des compromis à faire entre l'utilisation du multitraitement et des threads et cela dépend si votre travail est lié au processeur ou aux entrées-sorties.

Exemple de base de multiprocessing.pool

Voici un exemple très basique d'un pool de multitraitement

from multiprocessing import Pool

def process_line(line):
    return "FOO: %s" % line

if __name__ == "__main__":
    pool = Pool(4)
    with open('file.txt') as source_file:
        # chunk the work into batches of 4 lines at a time
        results = pool.map(process_line, source_file, 4)

    print results

Une piscine est un objet de commodité qui gère ses propres processus. Puisqu'un fichier ouvert peut itérer sur ses lignes, vous pouvez le passer à la fonction pool.map() qui bouclera sur elle et fournira des lignes à la fonction de travailleur. Carte et renvoie le résultat complet lorsqu'il est terminé. Soyez conscient qu'il s'agit d'un exemple excessivement simplifié, et que la fonction pool.map() va lire l'intégralité de votre fichier en mémoire en une seule fois avant d'effectuer le travail. Si vous vous attendez à avoir de gros fichiers, gardez cela à l'esprit. Il existe des moyens plus avancés de concevoir une configuration producteur/consommateur.

Pool" manuel avec limite et re-tri de ligne

Il s'agit d'un exemple manuel de la Pool.map mais au lieu de consommer un itérable entier en une seule fois, vous pouvez définir une taille de file d'attente de sorte que vous ne l'alimentez que morceau par morceau, aussi vite qu'il peut le faire. J'ai également ajouté les numéros de ligne afin que vous puissiez les suivre et vous y référer si vous le souhaitez, plus tard.

from multiprocessing import Process, Manager
import time
import itertools 

def do_work(in_queue, out_list):
    while True:
        item = in_queue.get()
        line_no, line = item

        # exit signal 
        if line == None:
            return

        # fake work
        time.sleep(.5)
        result = (line_no, line)

        out_list.append(result)

if __name__ == "__main__":
    num_workers = 4

    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)

    # start for workers    
    pool = []
    for i in xrange(num_workers):
        p = Process(target=do_work, args=(work, results))
        p.start()
        pool.append(p)

    # produce data
    with open("source.txt") as f:
        iters = itertools.chain(f, (None,)*num_workers)
        for num_and_line in enumerate(iters):
            work.put(num_and_line)

    for p in pool:
        p.join()

    # get the results
    # example:  [(1, "foo"), (10, "bar"), (0, "start")]
    print sorted(results)

9voto

mgilson Points 92954

Voici un exemple vraiment stupide que j'ai inventé :

import os.path
import multiprocessing

def newlinebefore(f,n):
    f.seek(n)
    c=f.read(1)
    while c!='\n' and n > 0:
        n-=1
        f.seek(n)
        c=f.read(1)

    f.seek(n)
    return n

filename='gpdata.dat'  #your filename goes here.
fsize=os.path.getsize(filename) #size of file (in bytes)

#break the file into 20 chunks for processing.
nchunks=20
initial_chunks=range(1,fsize,fsize/nchunks)

#You could also do something like:
#initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too.

with open(filename,'r') as f:
    start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks]))

end_byte=[i-1 for i in start_byte] [1:] + [None]

def process_piece(filename,start,end):
    with open(filename,'r') as f:
        f.seek(start+1)
        if(end is None):
            text=f.read()
        else: 
            nbytes=end-start+1
            text=f.read(nbytes)

    # process text here. createing some object to be returned
    # You could wrap text into a StringIO object if you want to be able to
    # read from it the way you would a file.

    returnobj=text
    return returnobj

def wrapper(args):
    return process_piece(*args)

filename_repeated=[filename]*len(start_byte)
args=zip(filename_repeated,start_byte,end_byte)

pool=multiprocessing.Pool(4)
result=pool.map(wrapper,args)

#Now take your results and write them to the database.
print "".join(result)  #I just print it to make sure I get my file back ...

La partie la plus délicate ici est de s'assurer que nous divisons le fichier sur les caractères de nouvelle ligne afin de ne manquer aucune ligne (ou de ne lire que des lignes partielles). Ensuite, chaque processus lit sa partie du fichier et renvoie un objet qui peut être placé dans la base de données par le thread principal. Bien sûr, vous pouvez même avoir besoin de faire cette partie par morceaux afin de ne pas avoir à garder toutes les informations en mémoire en même temps. (ceci est assez facile à réaliser -- il suffit de diviser la liste des "args" en X morceaux et d'appeler pool.map(wrapper,chunk) -- Voir aquí )

-2voto

Tanu Points 209

Il est possible de diviser le gros fichier unique en plusieurs fichiers plus petits et de traiter chacun d'entre eux dans des threads séparés.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X