150 votes

le multiprocessing : Comment partager un dictateur entre plusieurs processus ?

Un programme qui crée plusieurs processus qui travaillent sur une file d'attente joignable, Q et peut éventuellement manipuler un dictionnaire global. D pour stocker les résultats. (ainsi, chaque processus enfant peut utiliser D pour stocker son résultat et voir également les résultats produits par les autres processus enfants)

Si j'imprime le dictionnaire D dans un processus enfant, je vois les modifications qui ont été faites sur lui (i.e. sur D). Mais après que le processus principal ait rejoint Q, si j'imprime D, c'est un dict vide !

Je comprends que c'est un problème de synchronisation/verrouillage. Quelqu'un peut-il me dire ce qui se passe ici, et comment je peux synchroniser l'accès à D ?

205voto

senderle Points 41607

Une réponse générale consiste à utiliser un Manager objet. Adapté de la documentation :

from multiprocessing import Process, Manager

def f(d):
    d[1] += '1'
    d['2'] += 2

if __name__ == '__main__':
    manager = Manager()

    d = manager.dict()
    d[1] = '1'
    d['2'] = 2

    p1 = Process(target=f, args=(d,))
    p2 = Process(target=f, args=(d,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print d

Sortie :

$ python mul.py 
{1: '111', '2': 6}

31voto

Jeremy Brown Points 4950

Le multiprocessing n'est pas comme le threading. Chaque processus enfant obtient une copie de la mémoire du processus principal. En général, l'état est partagé via la communication (pipes/sockets), les signaux ou la mémoire partagée.

Le multiprocessing rend certaines abstractions disponibles pour votre cas d'utilisation - un état partagé qui est traité comme local par l'utilisation de proxies ou de mémoire partagée : http://docs.python.org/library/multiprocessing.html#sharing-state-between-processes

Sections pertinentes :

25voto

Brad Solomon Points 11873

En plus de ce que @senderle a dit ici, certains pourraient aussi se demander comment utiliser la fonctionnalité de multiprocessing.Pool .

Ce qui est bien, c'est qu'il y a un .Pool() à la méthode manager qui imite toutes les API familières de l'API de niveau supérieur. multiprocessing .

from itertools import repeat
import multiprocessing as mp
import os
import pprint

def f(d: dict) -> None:
    pid = os.getpid()
    d[pid] = "Hi, I was written by process %d" % pid

if __name__ == '__main__':
    with mp.Manager() as manager:
        d = manager.dict()
        with manager.Pool() as pool:
            pool.map(f, repeat(d, 10))
        # `d` is a DictProxy object that can be converted to dict
        pprint.pprint(dict(d))

Sortie :

$ python3 mul.py 
{22562: 'Hi, I was written by process 22562',
 22563: 'Hi, I was written by process 22563',
 22564: 'Hi, I was written by process 22564',
 22565: 'Hi, I was written by process 22565',
 22566: 'Hi, I was written by process 22566',
 22567: 'Hi, I was written by process 22567',
 22568: 'Hi, I was written by process 22568',
 22569: 'Hi, I was written by process 22569',
 22570: 'Hi, I was written by process 22570',
 22571: 'Hi, I was written by process 22571'}

Il s'agit d'un exemple légèrement différent où chaque processus enregistre simplement son ID de processus dans la base de données globale de l'entreprise. DictProxy objet d .

19voto

alyaxey Points 512

J'aimerais partager mon propre travail qui est plus rapide que le dict de Manager et est plus simple et plus stable que la bibliothèque pyshmht qui utilise des tonnes de mémoire et ne fonctionne pas pour Mac OS. Bien que mon dict ne fonctionne que pour les chaînes de caractères simples et est immuable actuellement. J'utilise une implémentation de sondage linéaire et stocke les paires de clés et de valeurs dans un bloc de mémoire séparé après la table.

from mmap import mmap
import struct
from timeit import default_timer
from multiprocessing import Manager
from pyshmht import HashTable

class shared_immutable_dict:
    def __init__(self, a):
        self.hs = 1 << (len(a) * 3).bit_length()
        kvp = self.hs * 4
        ht = [0xffffffff] * self.hs
        kvl = []
        for k, v in a.iteritems():
            h = self.hash(k)
            while ht[h] != 0xffffffff:
                h = (h + 1) & (self.hs - 1)
            ht[h] = kvp
            kvp += self.kvlen(k) + self.kvlen(v)
            kvl.append(k)
            kvl.append(v)

        self.m = mmap(-1, kvp)
        for p in ht:
            self.m.write(uint_format.pack(p))
        for x in kvl:
            if len(x) <= 0x7f:
                self.m.write_byte(chr(len(x)))
            else:
                self.m.write(uint_format.pack(0x80000000 + len(x)))
            self.m.write(x)

    def hash(self, k):
        h = hash(k)
        h = (h + (h >> 3) + (h >> 13) + (h >> 23)) * 1749375391 & (self.hs - 1)
        return h

    def get(self, k, d=None):
        h = self.hash(k)
        while True:
            x = uint_format.unpack(self.m[h * 4:h * 4 + 4])[0]
            if x == 0xffffffff:
                return d
            self.m.seek(x)
            if k == self.read_kv():
                return self.read_kv()
            h = (h + 1) & (self.hs - 1)

    def read_kv(self):
        sz = ord(self.m.read_byte())
        if sz & 0x80:
            sz = uint_format.unpack(chr(sz) + self.m.read(3))[0] - 0x80000000
        return self.m.read(sz)

    def kvlen(self, k):
        return len(k) + (1 if len(k) <= 0x7f else 4)

    def __contains__(self, k):
        return self.get(k, None) is not None

    def close(self):
        self.m.close()

uint_format = struct.Struct('>I')

def uget(a, k, d=None):
    return to_unicode(a.get(to_str(k), d))

def uin(a, k):
    return to_str(k) in a

def to_unicode(s):
    return s.decode('utf-8') if isinstance(s, str) else s

def to_str(s):
    return s.encode('utf-8') if isinstance(s, unicode) else s

def mmap_test():
    n = 1000000
    d = shared_immutable_dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'mmap speed: %d gets per sec' % (n / (default_timer() - start_time))

def manager_test():
    n = 100000
    d = Manager().dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'manager speed: %d gets per sec' % (n / (default_timer() - start_time))

def shm_test():
    n = 1000000
    d = HashTable('tmp', n)
    d.update({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'shm speed: %d gets per sec' % (n / (default_timer() - start_time))

if __name__ == '__main__':
    mmap_test()
    manager_test()
    shm_test()

Sur mon ordinateur portable, les résultats sont les suivants :

mmap speed: 247288 gets per sec
manager speed: 33792 gets per sec
shm speed: 691332 gets per sec

exemple d'utilisation simple :

ht = shared_immutable_dict({'a': '1', 'b': '2'})
print ht.get('a')

3voto

felix021 Points 311

Vous pouvez peut-être essayer pyshmht Une extension de table de hachage basée sur la mémoire partagée pour Python.

Avis

  1. Il n'est pas entièrement testé, juste pour votre référence.

  2. Il manque actuellement des mécanismes de verrouillage/sem pour le multitraitement.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X