150 votes

Objets à mémoire partagée en multitraitement python

Supposons que j'ai une grande mémoire un tableau numpy, j'ai une fonction func que prend dans ce tableau géant en entrée (en collaboration avec d'autres paramètres). func avec les différents paramètres à utiliser peuvent être exécutées en parallèle. Par exemple

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

Si j'utilise le multitraitement bibliothèque, alors que le tableau géant sera copié plusieurs fois dans différents processus.

Est-il un moyen pour permettre aux différents processus partagent le même tableau? Ce tableau objet est en lecture seule et ne sera jamais modifié.

Ce qui est plus compliqué, si l'arr n'est pas un tableau, mais l'arbitraire d'un objet python, est-il un moyen de le partager?

[ÉDITÉ]

J'ai lu la réponse, mais je suis encore un peu confus. Depuis fork() est copy-on-write, il ne faut pas se prévaloir de tout coût supplémentaire lors de la ponte de nouveaux processus en python de multitraitement de la bibliothèque. Mais le code suivant permet de croire qu'il est une surcharge énorme:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

de sortie (et par la manière, le coût augmente à mesure que la taille de la matrice augmente, donc je pense qu'il y est encore des frais généraux liés à la mémoire de la copie):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

Pourquoi il y a surcharge énorme, si nous n'avons pas copier le tableau? Et quel rôle la mémoire partagée pour me sauver?

143voto

Francis Avila Points 18236

Si vous utilisez un système d'exploitation qui utilise copy-on-write fork() de la sémantique (comme tout le common unix), alors tant que vous ne modifiez jamais votre structure de données, il sera disponible pour tous les processus enfants sans prendre de mémoire supplémentaire. Vous n'aurez pas à faire quelque chose de spécial (à l'exception de faire absolument sûr que vous n'avez pas modifié l'objet).

Les plus efficaces chose que vous pouvez faire pour votre problème serait d'apporter votre tableau en un efficace de la structure du tableau (à l'aide d' numpy ou array), lieu que dans la mémoire partagée, l'envelopper avec multiprocessing.Array, et le passer à vos fonctions. Cette réponse montre comment le faire.

Si vous voulez une écriture de l'objet partagé, alors vous aurez besoin pour l'envelopper avec une sorte de synchronisation ou de verrouillage. multiprocessing fournit deux méthodes: l'une utilisant la mémoire partagée (adapté pour des valeurs simples, des tableaux, ou ctypes) ou un Manager proxy, où un processus conserve la mémoire et un manager joue le rôle d'arbitre de l'accès à partir d'autres processus (même sur un réseau).

L' Manager approche peut être utilisée avec l'arbitraire des objets Python, mais sera plus lent que l'équivalent de l'aide mémoire partagée parce que les objets ont besoin de sérialiser/désérialiser et envoyé entre les processus.

Il y a une richesse de traitement en parallèle des bibliothèques et des approches disponibles en Python. multiprocessing est un excellent et bien arrondi de la bibliothèque, mais si vous avez des besoins spéciaux peut-être l'un des autres approches, peut-être mieux.

17voto

Je rencontre le même problème et ai écrit une petite classe d’utilitaires à mémoire partagée pour la contourner.

J'utilise le multitraitement.RawArray (lockfree), et l'accès aux tableaux n'est pas du tout synchronisé (lockfree), veillez à ne pas vous prendre à vous-même.

Avec la solution, les accélérations sont multipliées par 3 sur un i7 quad-core.

Voici le code: N'hésitez pas à l'utiliser et à l'améliorer, et s'il vous plaît rapportez tout bogue.

 '''
Created on 14.05.2013

@author: martin
'''

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):
    pass

'''
Singleton Pattern
'''
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):

        self.lock.acquire()

        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle
        self.__getNextFreeHdl()        

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnr].reshape(dimensions)

        # update cnt
        self.cnt += 1

        self.lock.release()

        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        self.lock.acquire()
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1
        self.lock.release()

    def __getArray(self, i):
        return self.shared_arrays[i]

    @staticmethod
    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    @staticmethod
    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    @staticmethod
    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    @staticmethod    
    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load
SharedNumpyMemManager.getInstance()

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_par(5)
        print(pf.getArray())

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_seq(5)
        print(pf.getArray())

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))
 

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X