98 votes

Le multiprocesseur Python écrit en toute sécurité dans un fichier

J'essaie de résoudre un gros problème numérique qui implique de nombreux sous-problèmes, et j'utilise le module de multitraitement de Python (spécifiquement Pool.map) pour répartir les différents sous-problèmes indépendants sur différents cœurs. Chaque sous-problème implique le calcul de nombreux sous-sous-problèmes, et j'essaie de mémoriser efficacement ces résultats en les stockant dans un fichier s'ils n'ont pas encore été calculés par un processus, sinon je saute le calcul et je lis simplement les résultats du fichier.

J'ai des problèmes de concurrence avec les fichiers : différents processus vérifient parfois si un sous-sous-problème a déjà été calculé (en cherchant le fichier où les résultats seraient stockés), voient que ce n'est pas le cas, exécutent le calcul, puis essaient d'écrire les résultats dans le même fichier en même temps. Comment éviter les collisions d'écriture de ce genre ?

159voto

MikeHunter Points 1388

@GP89 a mentionné une bonne solution. Utilisez une file d'attente pour envoyer les tâches d'écriture à un processus dédié qui a un accès unique en écriture au fichier. Tous les autres travailleurs ont un accès en lecture seule. Cela permet d'éliminer les collisions. Voici un exemple qui utilise apply_async, mais cela fonctionnera aussi avec map :

import multiprocessing as mp
import time

fn = 'c:/temp/temp.txt'

def worker(arg, q):
    '''stupidly simulates long running process'''
    start = time.clock()
    s = 'this is a test'
    txt = s
    for i in range(200000):
        txt += s 
    done = time.clock() - start
    with open(fn, 'rb') as f:
        size = len(f.read())
    res = 'Process' + str(arg), str(size), done
    q.put(res)
    return res

def listener(q):
    '''listens for messages on the q, writes to file. '''

    with open(fn, 'w') as f:
        while 1:
            m = q.get()
            if m == 'kill':
                f.write('killed')
                break
            f.write(str(m) + '\n')
            f.flush()

def main():
    #must use Manager queue here, or will not work
    manager = mp.Manager()
    q = manager.Queue()    
    pool = mp.Pool(mp.cpu_count() + 2)

    #put listener to work first
    watcher = pool.apply_async(listener, (q,))

    #fire off workers
    jobs = []
    for i in range(80):
        job = pool.apply_async(worker, (i, q))
        jobs.append(job)

    # collect results from the workers through the pool result queue
    for job in jobs: 
        job.get()

    #now we are done, kill the listener
    q.put('kill')
    pool.close()
    pool.join()

if __name__ == "__main__":
   main()

2voto

fizix137 Points 100

Il me semble que vous devez utiliser Manager pour enregistrer temporairement vos résultats dans une liste, puis écrire les résultats de la liste dans un fichier. Vous pouvez également utiliser starmap pour passer l'objet que vous voulez traiter et la liste des gérés. La première étape est de construire le paramètre qui sera passé à starmap qui comprend la liste gérée.

from multiprocessing import Manager
from multiprocessing import Pool  
import pandas as pd

def worker(row, param):
    # do something here and then append it to row
    x = param**2
    row.append(x)

if __name__ == '__main__':
    pool_parameter = [] # list of objects to process
    with Manager() as mgr:
        row = mgr.list([])

        # build list of parameters to send to starmap
        for param in pool_parameter:
            params.append([row,param])

        with Pool() as p:
            p.starmap(worker, params)

À partir de là, vous devez décider comment vous allez traiter la liste. Si vous avez des tonnes de RAM et un énorme ensemble de données, n'hésitez pas à concaténer en utilisant pandas. Vous pouvez ensuite sauvegarder le fichier très facilement sous forme de csv ou de pickle.

        df = pd.concat(row, ignore_index=True)

        df.to_pickle('data.pickle')
        df.to_csv('data.csv')

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X