142 votes

Utiliser un tableau numpy en mémoire partagée pour le multitraitement

Je voudrais utiliser un tableau numpy en mémoire partagée pour l'utiliser avec le module multiprocesseur. La difficulté est de l'utiliser comme un tableau numpy, et pas seulement comme un tableau ctypes.

from multiprocessing import Process, Array
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child processes
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Printing out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

Cela produit des résultats tels que :

Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976]
Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976]

Le tableau peut être accédé de manière ctypique, par ex. arr[i] a un sens. Cependant, il ne s'agit pas d'un tableau numpy, et je ne peux pas effectuer des opérations telles que -1*arr ou arr.sum() . Je suppose qu'une solution serait de convertir le tableau ctypes en un tableau numpy. Cependant (en plus de ne pas être capable de faire fonctionner cette solution), je ne pense pas qu'elle serait partagée.

Il semble qu'il y aurait une solution standard à ce qui doit être un problème commun.

1 votes

Ce n'est pas le même que celui-là ? stackoverflow.com/questions/5033799/

1 votes

Ce n'est pas tout à fait la même question. La question liée porte sur subprocess plutôt que multiprocessing .

102voto

J.F. Sebastian Points 102961

Pour compléter les réponses de @unutbu (qui n'est plus disponible) et de @Henry Gomersall. Vous pourriez utiliser shared_arr.get_lock() pour synchroniser l'accès en cas de besoin :

shared_arr = mp.Array(ctypes.c_double, N)
# ...
def f(i): # could be anything numpy accepts as an index such another numpy array
    with shared_arr.get_lock(): # synchronize access
        arr = np.frombuffer(shared_arr.get_obj()) # no data copying
        arr[i] = -arr[i]

Exemple

import ctypes
import logging
import multiprocessing as mp

from contextlib import closing

import numpy as np

info = mp.get_logger().info

def main():
    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    # create shared array
    N, M = 100, 11
    shared_arr = mp.Array(ctypes.c_double, N)
    arr = tonumpyarray(shared_arr)

    # fill with random values
    arr[:] = np.random.uniform(size=N)
    arr_orig = arr.copy()

    # write to arr from different processes
    with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
        # many processes access the same slice
        stop_f = N // 10
        p.map_async(f, [slice(stop_f)]*M)

        # many processes access different slices of the same array
        assert M % 2 # odd
        step = N // 10
        p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)])
    p.join()
    assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)

def init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_ # must be inherited, not passed as an argument

def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

def f(i):
    """synchronized."""
    with shared_arr.get_lock(): # synchronize access
        g(i)

def g(i):
    """no synchronization."""
    info("start %s" % (i,))
    arr = tonumpyarray(shared_arr)
    arr[i] = -1 * arr[i]
    info("end   %s" % (i,))

if __name__ == '__main__':
    mp.freeze_support()
    main()

Si vous n'avez pas besoin d'un accès synchronisé ou si vous créez vos propres verrous, alors mp.Array() n'est pas nécessaire. Vous pourriez utiliser mp.sharedctypes.RawArray dans ce cas.

2 votes

Belle réponse ! Si je veux avoir plus d'un tableau partagé, chacun verrouillable séparément, mais avec le nombre de tableaux déterminé au moment de l'exécution, est-ce une extension directe de ce que vous avez fait ici ?

4 votes

@Andrew : les tableaux partagés devraient être créés avant les processus enfants sont créés.

0 votes

Bon point sur l'ordre des opérations. C'est ce que j'avais en tête : créer un nombre de tableaux partagés spécifié par l'utilisateur, puis créer quelques processus enfants. Est-ce que c'est simple ?

22voto

EelkeSpaak Points 1551

Bien que les réponses déjà données soient bonnes, il existe une solution beaucoup plus simple à ce problème si deux conditions sont remplies :

  1. Vous êtes sur un Conforme à POSIX système d'exploitation (par exemple, Linux, Mac OSX) ; et
  2. Les processus de votre enfant nécessitent accès en lecture seule au tableau partagé.

Dans ce cas, il n'est pas nécessaire de faire en sorte que les variables soient explicitement partagées, car les processus enfants seront créés en utilisant une fourche. Un enfant bifurqué partage automatiquement l'espace mémoire du parent. Dans le contexte du multitraitement Python, cela signifie qu'il partage toutes les variables de l'espace mémoire du parent. au niveau du module variables ; notez que cette ne tient pas pour les arguments que vous passez explicitement à vos processus enfants ou aux fonctions que vous appelez sur une page d'accueil. multiprocessing.Pool ou à peu près.

Un exemple simple :

import multiprocessing
import numpy as np

# will hold the (implicitly mem-shared) data
data_array = None

# child worker function
def job_handler(num):
    # built-in id() returns unique memory ID of a variable
    return id(data_array), np.sum(data_array)

def launch_jobs(data, num_jobs=5, num_worker=4):
    global data_array
    data_array = data

    pool = multiprocessing.Pool(num_worker)
    return pool.map(job_handler, range(num_jobs))

# create some random data and execute the child jobs
mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10)))

# this will print 'True' on POSIX OS, since the data was shared
print(np.all(np.asarray(mem_ids) == id(data_array)))

5 votes

+1 Info vraiment précieuse. Pouvez-vous expliquer pourquoi seules les variables de niveau module sont partagées ? Pourquoi les variables locales ne font pas partie de l'espace mémoire du parent ? Par exemple, pourquoi cela ne peut pas fonctionner si j'ai une fonction F avec une variable locale V et une fonction G à l'intérieur de F qui fait référence à V ?

14 votes

Avertissement : Cette réponse est un peu trompeuse. Le processus enfant reçoit une copie de l'état du processus parent, y compris les variables globales, au moment du fork. Les états ne sont en aucun cas synchronisés et divergeront à partir de ce moment. Cette technique peut être utile dans certains scénarios (par exemple, la bifurcation de processus enfants ad hoc qui traitent chacun un instantané du processus parent et se terminent ensuite), mais elle est inutile dans d'autres (par exemple, les processus enfants à long terme qui doivent partager et synchroniser des données avec le processus parent).

1 votes

@DavidStein : Oui, je pense avoir mentionné explicitement dans la réponse (point 2) que la synchronisation des données avec le processus parent (nécessitant donc plus qu'un accès en lecture seule) empêchera cette technique de fonctionner comme prévu. Dans le cas de processus enfants effectuant plus d'une tâche, etc., vous feriez mieux d'utiliser une mémoire partagée explicite et de ne pas compter sur la sémantique de la fourche. Dans de nombreux cas, cependant, l'astuce de la bifurcation fonctionne très bien pour moi (par exemple, chaque processus enfant effectue un calcul coûteux sur une tranche d'une matrice présente dans la mémoire parentale).

21voto

Henry Gomersall Points 2916

El Array a un get_obj() qui lui est associée, qui renvoie le tableau ctypes qui présente une interface de tampon. Je pense que ce qui suit devrait fonctionner...

from multiprocessing import Process, Array
import scipy
import numpy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    a = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(a[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(a,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%a[:2]

    b = numpy.frombuffer(a.get_obj())

    b[0] = 10.0
    print a[0]

Lorsqu'elle est exécutée, elle imprime le premier élément de a qui est maintenant de 10.0, montrant a y b ne sont que deux vues de la même mémoire.

Afin de s'assurer qu'il est toujours sûr pour les multiprocesseurs, je pense que vous devrez utiliser la fonction acquire y release qui existent sur le Array objet, a et son verrou intégré pour s'assurer que tout est accessible en toute sécurité (bien que je ne sois pas un expert du module multiprocesseur).

0 votes

Cela ne fonctionnera pas sans synchronisation comme @unutbu l'a démontré dans sa réponse (maintenant supprimée).

1 votes

Je suppose que si vous voulez simplement accéder au tableau après le traitement, vous pouvez le faire proprement sans vous soucier des problèmes de concurrence et de verrouillage ?

0 votes

Dans ce cas, vous n'avez pas besoin mp.Array .

12voto

mat Points 129

J'ai écrit un petit module python qui utilise la mémoire partagée POSIX pour partager des tableaux numpy entre interprètes python. Peut-être le trouverez-vous pratique.

https://pypi.python.org/pypi/SharedArray

Voici comment cela fonctionne :

import numpy as np
import SharedArray as sa

# Create an array in shared memory
a = sa.create("test1", 10)

# Attach it as a different array. This can be done from another
# python interpreter as long as it runs on the same computer.
b = sa.attach("test1")

# See how they are actually sharing the same memory block
a[0] = 42
print(b[0])

# Destroying a does not affect b.
del a
print(b[0])

# See how "test1" is still present in shared memory even though we
# destroyed the array a.
sa.list()

# Now destroy the array "test1" from memory.
sa.delete("test1")

# The array b is not affected, but once you destroy it then the
# data are lost.
print(b[0])

10voto

Velimir Mlaker Points 1419

Vous pouvez utiliser le sharedmem module : https://bitbucket.org/cleemesser/numpy-sharedmem

Voici donc votre code original, cette fois-ci en utilisant une mémoire partagée qui se comporte comme un tableau NumPy (notez la dernière instruction supplémentaire appelant un tableau NumPy sum() fonction) :

from multiprocessing import Process
import sharedmem
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = sharedmem.empty(N)
    arr[:] = unshared_arr.copy()
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

    # Perform some NumPy operation
    print arr.sum()

2 votes

Remarque : ce système n'est plus développé et ne semble pas fonctionner sous linux. github.com/sturlamolden/sharedmem-numpy/issues/4

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X