Ce que vous recherchez, c'est un modèle de producteur/consommateur.
Exemple de filetage de base
Voici un exemple de base utilisant le module d'enfilage (au lieu du multiprocessing)
import threading
import Queue
import sys
def do_work(in_queue, out_queue):
while True:
item = in_queue.get()
# process
result = item
out_queue.put(result)
in_queue.task_done()
if __name__ == "__main__":
work = Queue.Queue()
results = Queue.Queue()
total = 20
# start for workers
for i in xrange(4):
t = threading.Thread(target=do_work, args=(work, results))
t.daemon = True
t.start()
# produce data
for i in xrange(total):
work.put(i)
work.join()
# get the results
for i in xrange(total):
print results.get()
sys.exit()
Vous ne partageriez pas l'objet fichier avec les fils. Vous produisez du travail pour eux en fournissant l'objet file d'attente avec des lignes de données. Ensuite, chaque thread prendrait une ligne, la traiterait, puis la renverrait dans la file d'attente.
Il existe des fonctions plus avancées intégrées dans le module multiprocesseur pour partager des données, comme des listes et des un type particulier de file d'attente . Il y a des compromis à faire entre l'utilisation du multitraitement et des threads et cela dépend si votre travail est lié au processeur ou aux entrées-sorties.
Exemple de base de multiprocessing.pool
Voici un exemple très basique d'un pool de multitraitement
from multiprocessing import Pool
def process_line(line):
return "FOO: %s" % line
if __name__ == "__main__":
pool = Pool(4)
with open('file.txt') as source_file:
# chunk the work into batches of 4 lines at a time
results = pool.map(process_line, source_file, 4)
print results
Une piscine est un objet de commodité qui gère ses propres processus. Puisqu'un fichier ouvert peut itérer sur ses lignes, vous pouvez le passer à la fonction pool.map()
qui bouclera sur elle et fournira des lignes à la fonction de travailleur. Carte et renvoie le résultat complet lorsqu'il est terminé. Soyez conscient qu'il s'agit d'un exemple excessivement simplifié, et que la fonction pool.map()
va lire l'intégralité de votre fichier en mémoire en une seule fois avant d'effectuer le travail. Si vous vous attendez à avoir de gros fichiers, gardez cela à l'esprit. Il existe des moyens plus avancés de concevoir une configuration producteur/consommateur.
Pool" manuel avec limite et re-tri de ligne
Il s'agit d'un exemple manuel de la Pool.map mais au lieu de consommer un itérable entier en une seule fois, vous pouvez définir une taille de file d'attente de sorte que vous ne l'alimentez que morceau par morceau, aussi vite qu'il peut le faire. J'ai également ajouté les numéros de ligne afin que vous puissiez les suivre et vous y référer si vous le souhaitez, plus tard.
from multiprocessing import Process, Manager
import time
import itertools
def do_work(in_queue, out_list):
while True:
item = in_queue.get()
line_no, line = item
# exit signal
if line == None:
return
# fake work
time.sleep(.5)
result = (line_no, line)
out_list.append(result)
if __name__ == "__main__":
num_workers = 4
manager = Manager()
results = manager.list()
work = manager.Queue(num_workers)
# start for workers
pool = []
for i in xrange(num_workers):
p = Process(target=do_work, args=(work, results))
p.start()
pool.append(p)
# produce data
with open("source.txt") as f:
iters = itertools.chain(f, (None,)*num_workers)
for num_and_line in enumerate(iters):
work.put(num_and_line)
for p in pool:
p.join()
# get the results
# example: [(1, "foo"), (10, "bar"), (0, "start")]
print sorted(results)