304 votes

Comment dois-je me connecter lors de l'utilisation du multitraitement en Python?

Droit maintenant, j'ai un module central dans un cadre qui génère plusieurs processus à l'aide de Python 2.6 multiprocessing module. Parce qu'il utilise multiprocessing, il est au niveau du module multiprocessing-conscient journal, LOG = multiprocessing.get_logger(). Pour les docs, cet enregistreur a des processus de verrous partagés afin de ne pas corrompre les choses en sys.stderr (ou autre descripteur de fichier) par le fait d'avoir plusieurs processus écrivant simultanément.

Le problème que j'ai maintenant c'est que les autres modules dans le cadre ne sont pas multitraitement. La façon dont je le vois, j'ai besoin de faire toutes les dépendances sur cette centrale, le module multiprocessing-conscient de l'exploitation forestière. C'est gênant dans le cadre et, a fortiori, pour tous les clients de la structure. Existe-il des solutions de rechange je ne pense pas?

148voto

zzzeek Points 22617

Je viens de maintenant, a écrit un journal de gestionnaire de mon propre qui vient nourrit tout le processus parent par l'intermédiaire d'un tuyau. Je ne l'ai été de le tester pendant dix minutes, mais il semble fonctionner assez bien (à noter qu'il est codé en dur à RotatingFileHandler, qui est mon propre cas d'utilisation)

Mis à jour. Cette utilise maintenant une file d'attente pour la manipulation correcte de la concurrence, et aussi récupère les erreurs correctement. J'ai été en utilisant cette en production depuis plusieurs mois et la version actuelle ci-dessous fonctionne sans problème.

from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback

class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args
        # have been stringified.  Removes any chance of
        # unpickleable things inside and possibly reduces
        # message size sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)

89voto

vladr Points 34562

La seule façon de traiter avec ce de manière non intrusive, c'est le frai chaque processus de travail de sorte que son journal va à un autre descripteur de fichier (à disque ou à la pipe.) Idéalement, toutes les entrées de journal doit être horodaté. Votre contrôleur de processus peut alors (si vous utilisez les fichiers du disque) fusionner les fichiers journaux à la fin de la course (tri par date) ou, si l'utilisation de tuyaux (méthode recommandée), fusionner les entrées de journal à la volée à partir de tous les tuyaux d'en faire un journal (p. ex. périodiquement select de la tuyauterie' fd, effectuer de fusion-pour trier les entrées de journal, au ras journal centralisé, répéter.)

22voto

Ali Afshar Points 22836

Pourtant, une autre alternative pourrait être la non-basé sur des fichiers de journalisation des gestionnaires dans la journalisation paquet:

  • SocketHandler
  • DatagramHandler
  • SyslogHandler

(et les autres)

De cette façon, vous pouvez facilement avoir un démon de journaux quelque part que l'on peut écrire, en toute sécurité et serait en mesure de gérer correctement les résultats. Par exemple, un simple serveur socket qui vient de unpickles le message et émet à son propre rotation gestionnaire de fichier.

Le syslog gestionnaire de prendre soin de cela pour vous aussi. Bien sûr, vous pouvez utiliser votre propre instance de syslog pas le système.

14voto

ironhacker Points 91

Une variante des autres qui garde la thread de journalisation et de file d'attente distincte.

 """sample code for logging in subprocesses using multiprocessing

* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
  process.
* As in the other implementations, a thread reads the queue and calls the
  handlers. Except in this implementation, the thread is defined outside of a
  handler, which makes the logger definitions simpler.
* Works with multiple handlers.  If the logger in the main process defines
  multiple handlers, they will all be fed records generated by the
  subprocesses loggers.

tested with Python 2.5 and 2.6 on Linux and Windows

"""

import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys

DEFAULT_LEVEL = logging.DEBUG

formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")

class SubProcessLogHandler(logging.Handler):
    """handler used by subprocesses

    It simply puts items on a Queue for the main process to log.

    """

    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue

    def emit(self, record):
        self.queue.put(record)

class LogQueueReader(threading.Thread):
    """thread to write subprocesses log records to main process log

    This thread reads the records written by subprocesses and writes them to
    the handlers defined in the main process's handlers.

    """

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.daemon = True

    def run(self):
        """read from the queue and write to the log handlers

        The logging documentation says logging is thread safe, so there
        shouldn't be contention between normal logging (from the main
        process) and this thread.

        Note that we're using the name of the original logger.

        """
        # Thanks Mike for the error checking code.
        while True:
            try:
                record = self.queue.get()
                # get the logger for this record
                logger = logging.getLogger(record.name)
                logger.callHandlers(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

class LoggingProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def _setupLogger(self):
        # create the logger to use.
        logger = logging.getLogger('test.subprocess')
        # The only handler desired is the SubProcessLogHandler.  If any others
        # exist, remove them. In this case, on Unix and Linux the StreamHandler
        # will be inherited.

        for handler in logger.handlers:
            # just a check for my sanity
            assert not isinstance(handler, SubProcessLogHandler)
            logger.removeHandler(handler)
        # add the handler
        handler = SubProcessLogHandler(self.queue)
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # On Windows, the level will not be inherited.  Also, we could just
        # set the level to log everything here and filter it in the main
        # process handlers.  For now, just set it from the global default.
        logger.setLevel(DEFAULT_LEVEL)
        self.logger = logger

    def run(self):
        self._setupLogger()
        logger = self.logger
        # and here goes the logging
        p = multiprocessing.current_process()
        logger.info('hello from process %s with pid %s' % (p.name, p.pid))


if __name__ == '__main__':
    # queue used by the subprocess loggers
    queue = multiprocessing.Queue()
    # Just a normal logger
    logger = logging.getLogger('test')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(DEFAULT_LEVEL)
    logger.info('hello from the main process')
    # This thread will read from the subprocesses and write to the main log's
    # handlers.
    log_queue_reader = LogQueueReader(queue)
    log_queue_reader.start()
    # create the processes.
    for i in range(10):
        p = LoggingProcess(queue)
        p.start()
    # The way I read the multiprocessing warning about Queue, joining a
    # process before it has finished feeding the Queue can cause a deadlock.
    # Also, Queue.empty() is not realiable, so just make sure all processes
    # are finished.
    # active_children joins subprocesses when they're finished.
    while multiprocessing.active_children():
        time.sleep(.1)
 

11voto

schlamar Points 3108

Toutes les solutions actuelles sont trop couplée à la configuration de la journalisation à l'aide d'un gestionnaire. Ma solution a la suite de l'architecture et des fonctionnalités:

  • Vous pouvez utiliser toute la configuration de la journalisation vous voulez
  • L'abattage est réalisé dans un fil de démon
  • Arrêt sûr du démon à l'aide d'un gestionnaire de contexte
  • La Communication pour le thread de journalisation est fait par multiprocessing.Queue
  • Dans les sous-processus, logging.Logger (et déjà défini les instances) sont raccordées à envoyer tous les dossiers à la file d'attente
  • Nouveau: format d'assurer la traçabilité et de message avant de l'envoyer à la file d'attente pour éviter le décapage des erreurs

Code avec un exemple d'utilisation et de sortie peut être trouvé à l'Essentiel: https://gist.github.com/schlamar/7003737

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X