151 votes

Comment utiliser une file d'attente multiprocesseur en Python ?

J'ai beaucoup de mal à comprendre comment la file d'attente de multitraitement fonctionne sur python et comment l'implémenter. Disons que j'ai deux modules python qui accèdent aux données d'un fichier partagé, appelons ces deux modules un écrivain et un lecteur. Mon plan est de faire en sorte que le lecteur et le rédacteur placent des requêtes dans deux files d'attente multitraitement séparées, puis qu'un troisième processus place ces requêtes dans une boucle et les exécute comme telles.

Mon principal problème est que je ne sais vraiment pas comment implémenter multiprocessing.queue correctement, vous ne pouvez pas vraiment instancier l'objet pour chaque processus puisqu'ils seront des files d'attente séparées, comment vous assurez-vous que tous les processus se rapportent à une file d'attente partagée (ou dans ce cas, des files d'attente).

182voto

Mike Pennington Points 16712

Mon principal problème est que je ne sais vraiment pas comment implémenter multiprocessing.queue correctement, vous ne pouvez pas vraiment instancier l'objet pour chaque processus puisqu'ils seront des files d'attente séparées, comment vous assurez-vous que tous les processus se rapportent à une file d'attente partagée (ou dans ce cas, des files d'attente).

Voici un exemple simple d'un lecteur et d'un écrivain partageant une seule file d'attente... Le scripteur envoie un tas d'entiers au lecteur ; quand le scripteur n'a plus de nombres, il envoie 'DONE', ce qui permet au lecteur de sortir de la boucle de lecture.

Vous pouvez créer autant de processus de lecture que vous le souhaitez...

from multiprocessing import Process, Queue
import time
import sys

def reader_proc(queue):
    """Read from the queue; this spawns as a separate Process"""
    while True:
        msg = queue.get()  # Read from the queue and do nothing
        if msg == "DONE":
            break

def writer(count, num_of_reader_procs, queue):
    """Write integers into the queue.  A reader_proc() will read them from the queue"""
    for ii in range(0, count):
        queue.put(ii)  # Put 'count' numbers into queue

    ### Tell all readers to stop...
    for ii in range(0, num_of_reader_procs):
        queue.put("DONE")

def start_reader_procs(qq, num_of_reader_procs):
    """Start the reader processes and return all in a list to the caller"""
    all_reader_procs = list()
    for ii in range(0, num_of_reader_procs):
        ### reader_p() reads from qq as a separate process...
        ###    you can spawn as many reader_p() as you like
        ###    however, there is usually a point of diminishing returns
        reader_p = Process(target=reader_proc, args=((qq),))
        reader_p.daemon = True
        reader_p.start()  # Launch reader_p() as another proc

        all_reader_procs.append(reader_p)

    return all_reader_procs

if __name__ == "__main__":
    num_of_reader_procs = 2
    qq = Queue()  # writer() writes to qq from _this_ process
    for count in [10**4, 10**5, 10**6]:
        assert num_of_reader_procs > 0
        assert num_of_reader_procs < 4
        all_reader_procs = start_reader_procs(qq, num_of_reader_procs)

        writer(count, len(all_reader_procs), qq)  # Queue stuff to all reader_p()
        print("All reader processes are pulling numbers from the queue...")

        _start = time.time()
        for idx, a_reader_proc in enumerate(all_reader_procs):
            print("    Waiting for reader_p.join() index %s" % idx)
            a_reader_proc.join()  # Wait for a_reader_proc() to finish

            print("        reader_p() idx:%s is done" % idx)

        print(
            "Sending {0} integers through Queue() took {1} seconds".format(
                count, (time.time() - _start)
            )
        )
        print("")

29voto

Joe Holloway Points 11122

Voici une utilisation très simple de multiprocessing.Queue y multiprocessing.Process qui permet aux appelants d'envoyer un "événement" accompagné d'arguments à un processus distinct qui envoie l'événement à une méthode "do_" du processus. (Python 3.4+)

import multiprocessing as mp
import collections

Msg = collections.namedtuple('Msg', ['event', 'args'])

class BaseProcess(mp.Process):
    """A process backed by an internal queue for simple one-way message passing.
    """
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.queue = mp.Queue()

    def send(self, event, *args):
        """Puts the event and args as a `Msg` on the queue
        """
       msg = Msg(event, args)
       self.queue.put(msg)

    def dispatch(self, msg):
        event, args = msg

        handler = getattr(self, "do_%s" % event, None)
        if not handler:
            raise NotImplementedError("Process has no handler for [%s]" % event)

        handler(*args)

    def run(self):
        while True:
            msg = self.queue.get()
            self.dispatch(msg)

Utilisation :

class MyProcess(BaseProcess):
    def do_helloworld(self, arg1, arg2):
        print(arg1, arg2)

if __name__ == "__main__":
    process = MyProcess()
    process.start()
    process.send('helloworld', 'hello', 'world')

El send se produit dans le processus parent, le do_* se produit dans le processus enfant.

J'ai laissé de côté toute gestion d'exception qui interromprait évidemment la boucle d'exécution et quitterait le processus enfant. Vous pouvez également le personnaliser en remplaçant run pour contrôler le blocage ou autre chose.

Cela n'est vraiment utile que dans les situations où vous avez un seul processus de travail, mais je pense que c'est une réponse pertinente à cette question pour démontrer un scénario commun avec un peu plus d'orientation objet.

23voto

Nick B. Points 111

J'ai consulté de nombreuses réponses sur stack overflow et sur le web en essayant de mettre en place un moyen de faire du multitraitement en utilisant des files d'attente pour faire passer de grands cadres de données pandas. Il m'a semblé que toutes les réponses répétaient le même type de solutions sans tenir compte de la multitude de cas limites que l'on peut rencontrer lors de la mise en place de ce type de calculs. Le problème est qu'il y a beaucoup de choses en jeu en même temps. Le nombre de tâches, le nombre de travailleurs, la durée de chaque tâche et les exceptions possibles pendant l'exécution de la tâche. Tous ces éléments rendent la synchronisation délicate et la plupart des réponses n'indiquent pas comment s'y prendre. Voici donc mon point de vue après avoir bricolé pendant quelques heures, en espérant qu'il sera suffisamment générique pour que la plupart des gens le trouvent utile.

Quelques réflexions avant tout exemple de codage. Depuis queue.Empty o queue.qsize() ou toute autre méthode similaire n'est pas fiable pour le contrôle de flux, tout code de ce type

while True:
    try:
        task = pending_queue.get_nowait()
    except queue.Empty:
        break

est bidon. Cela tuera le travailleur même si, quelques millisecondes plus tard, une autre tâche apparaît dans la file d'attente. Le travailleur ne s'en remettra pas et, après un certain temps, TOUS les travailleurs disparaîtront car ils trouveront au hasard la file d'attente momentanément vide. Le résultat final sera que la fonction principale de multiprocessing (celle avec le join() sur les processus) reviendra sans que toutes les tâches soient terminées. Sympa. Bonne chance pour déboguer cela si vous avez des milliers de tâches et que quelques-unes sont manquantes.

L'autre problème est l'utilisation de valeurs sentinelles. De nombreuses personnes ont suggéré d'ajouter une valeur sentinelle dans la file d'attente pour signaler la fin de la file. Mais pour le signaler à qui exactement ? S'il y a N travailleurs, en supposant que N est le nombre de cœurs disponibles, alors une seule valeur sentinelle ne signalera la fin de la file d'attente qu'à un seul travailleur. Tous les autres travailleurs resteront en attente d'un travail supplémentaire alors qu'il n'y en a plus. Les exemples typiques que j'ai vus sont

while True:
    task = pending_queue.get()
    if task == SOME_SENTINEL_VALUE:
        break

Un travailleur obtiendra la valeur sentinelle tandis que les autres attendront indéfiniment. Aucun article que j'ai trouvé ne mentionne que vous devez soumettre la valeur sentinelle à la file d'attente AU MOINS autant de fois que vous avez de travailleurs afin que TOUS les travailleurs l'obtiennent.

L'autre problème est le traitement des exceptions pendant l'exécution des tâches. Là encore, il faut les attraper et les gérer. De plus, si vous avez un completed_tasks vous devez compter de manière indépendante et déterministe combien d'éléments se trouvent dans la file d'attente avant de décider que le travail est terminé. Encore une fois, se fier à la taille des files d'attente est voué à l'échec et donne des résultats inattendus.

Dans l'exemple ci-dessous, le par_proc() recevra une liste de tâches comprenant les fonctions avec lesquelles ces tâches doivent être exécutées, ainsi que les arguments et valeurs nommés.

import multiprocessing as mp
import dill as pickle
import queue
import time
import psutil

SENTINEL = None

def do_work(tasks_pending, tasks_completed):
    # Get the current worker's name
    worker_name = mp.current_process().name

    while True:
        try:
            task = tasks_pending.get_nowait()
        except queue.Empty:
            print(worker_name + ' found an empty queue. Sleeping for a while before checking again...')
            time.sleep(0.01)
        else:
            try:
                if task == SENTINEL:
                    print(worker_name + ' no more work left to be done. Exiting...')
                    break

                print(worker_name + ' received some work... ')
                time_start = time.perf_counter()
                work_func = pickle.loads(task['func'])
                result = work_func(**task['task'])
                tasks_completed.put({work_func.__name__: result})
                time_end = time.perf_counter() - time_start
                print(worker_name + ' done in {} seconds'.format(round(time_end, 5)))
            except Exception as e:
                print(worker_name + ' task failed. ' + str(e))
                tasks_completed.put({work_func.__name__: None})

def par_proc(job_list, num_cpus=None):

    # Get the number of cores
    if not num_cpus:
        num_cpus = psutil.cpu_count(logical=False)

    print('* Parallel processing')
    print('* Running on {} cores'.format(num_cpus))

    # Set-up the queues for sending and receiving data to/from the workers
    tasks_pending = mp.Queue()
    tasks_completed = mp.Queue()

    # Gather processes and results here
    processes = []
    results = []

    # Count tasks
    num_tasks = 0

    # Add the tasks to the queue
    for job in job_list:
        for task in job['tasks']:
            expanded_job = {}
            num_tasks = num_tasks + 1
            expanded_job.update({'func': pickle.dumps(job['func'])})
            expanded_job.update({'task': task})
            tasks_pending.put(expanded_job)

    # Use as many workers as there are cores (usually chokes the system so better use less)
    num_workers = num_cpus

    # We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
    # work left to be done.
    for c in range(num_workers):
        tasks_pending.put(SENTINEL)

    print('* Number of tasks: {}'.format(num_tasks))

    # Set-up and start the workers
    for c in range(num_workers):
        p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed))
        p.name = 'worker' + str(c)
        processes.append(p)
        p.start()

    # Gather the results
    completed_tasks_counter = 0
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1

    for p in processes:
        p.join()

    return results

Et voici un test pour exécuter le code ci-dessus contre

def test_parallel_processing():
    def heavy_duty1(arg1, arg2, arg3):
        return arg1 + arg2 + arg3

    def heavy_duty2(arg1, arg2, arg3):
        return arg1 * arg2 * arg3

    task_list = [
        {'func': heavy_duty1, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
        {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
    ]

    results = par_proc(task_list)

    job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
    job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])

    assert job1 == 15
    assert job2 == 21

plus un autre avec quelques exceptions

def test_parallel_processing_exceptions():
    def heavy_duty1_raises(arg1, arg2, arg3):
        raise ValueError('Exception raised')
        return arg1 + arg2 + arg3

    def heavy_duty2(arg1, arg2, arg3):
        return arg1 * arg2 * arg3

    task_list = [
        {'func': heavy_duty1_raises, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
        {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
    ]

    results = par_proc(task_list)

    job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
    job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])

    assert not job1
    assert job2 == 21

J'espère que cela vous aidera.

9voto

Jean Points 99

En " from queue import Queue "il n'y a pas de module appelé queue au lieu de multiprocessing doit être utilisé. Par conséquent, il devrait ressembler à " from multiprocessing import Queue "

4voto

changyuheng Points 577

Je viens de réaliser un exemple simple et général pour démontrer le passage d'un message sur une file d'attente entre 2 programmes autonomes. Il ne répond pas directement à la question de l'OP mais devrait être suffisamment clair pour indiquer le concept.

Serveur :

multiprocessing-queue-manager-server.py

import asyncio
import concurrent.futures
import multiprocessing
import multiprocessing.managers
import queue
import sys
import threading
from typing import Any, AnyStr, Dict, Union

class QueueManager(multiprocessing.managers.BaseManager):

    def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
        pass

def get_queue(ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
    global q

    if not ident in q:
        q[ident] = multiprocessing.Queue()

    return q[ident]

q: Dict[Union[AnyStr, int, type(None)], multiprocessing.Queue] = dict()
delattr(QueueManager, 'get_queue')

def init_queue_manager_server():
    if not hasattr(QueueManager, 'get_queue'):
        QueueManager.register('get_queue', get_queue)

def serve(no: int, term_ev: threading.Event):
    manager: QueueManager
    with QueueManager(authkey=QueueManager.__name__.encode()) as manager:
        print(f"Server address {no}: {manager.address}")

        while not term_ev.is_set():
            try:
                item: Any = manager.get_queue().get(timeout=0.1)
                print(f"Client {no}: {item} from {manager.address}")
            except queue.Empty:
                continue

async def main(n: int):
    init_queue_manager_server()
    term_ev: threading.Event = threading.Event()
    executor: concurrent.futures.ThreadPoolExecutor = concurrent.futures.ThreadPoolExecutor()

    i: int
    for i in range(n):
        asyncio.ensure_future(asyncio.get_running_loop().run_in_executor(executor, serve, i, term_ev))

    # Gracefully shut down
    try:
        await asyncio.get_running_loop().create_future()
    except asyncio.CancelledError:
        term_ev.set()
        executor.shutdown()
        raise

if __name__ == '__main__':
    asyncio.run(main(int(sys.argv[1])))

Client :

multiprocessing-queue-manager-client.py

import multiprocessing
import multiprocessing.managers
import os
import sys
from typing import AnyStr, Union

class QueueManager(multiprocessing.managers.BaseManager):

    def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
        pass

delattr(QueueManager, 'get_queue')

def init_queue_manager_client():
    if not hasattr(QueueManager, 'get_queue'):
        QueueManager.register('get_queue')

def main():
    init_queue_manager_client()

    manager: QueueManager = QueueManager(sys.argv[1], authkey=QueueManager.__name__.encode())
    manager.connect()

    message = f"A message from {os.getpid()}"
    print(f"Message to send: {message}")
    manager.get_queue().put(message)

if __name__ == '__main__':
    main()

Utilisation

Serveur :

$ python3 multiprocessing-queue-manager-server.py N

N est un nombre entier indiquant combien de serveurs doivent être créés. Copiez l'un des <server-address-N> sortie par le serveur et en faire le premier argument de chaque multiprocessing-queue-manager-client.py .

Client :

python3 multiprocessing-queue-manager-client.py <server-address-1>

Résultat

Serveur :

Client 1: <item> from <server-address-1>

Gist : https://gist.github.com/89062d639e40110c61c2f88018a8b0e5


UPD : Création d'un paquet aquí .

Serveur :

import ipcq

with ipcq.QueueManagerServer(address=ipcq.Address.AUTO, authkey=ipcq.AuthKey.AUTO) as server:
    server.get_queue().get()

Client :

import ipcq

client = ipcq.QueueManagerClient(address=ipcq.Address.AUTO, authkey=ipcq.AuthKey.AUTO)
client.get_queue().put('a message')

enter image description here

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X