38 votes

Python 3 : détection des avertissements pendant le multitraitement

Trop long, je n'ai pas lu

El warnings.catch_warnings() le gestionnaire de contexte est pas de sécurité pour les fils . Comment l'utiliser dans un environnement de traitement parallèle ?

Contexte

Le code ci-dessous résout un problème de maximisation en utilisant le traitement parallèle avec l'outil Python multiprocessing module. Il prend une liste de widgets (immuables), les partitionne (voir Multitraitement efficace de maximisation massive par force brute en Python 3 ), trouve les maxima ("finalistes") de toutes les partitions, puis trouve le maximum ("champion") de ces "finalistes". Si je comprends correctement mon propre code (et je ne serais pas ici si c'était le cas), je partage la mémoire avec tous les processus enfants pour leur donner les widgets d'entrée, et multiprocessing utilise un pipe au niveau du système d'exploitation et le pickling pour renvoyer les widgets finalistes au processus principal lorsque les travailleurs ont terminé.

Source du problème

Je veux attraper les avertissements de widgets redondants causés par la ré-instauration des widgets après le dépiquage. qui se produit lorsque les widgets sortent du tuyau inter-processus. Lorsque les objets widgets s'instancient, ils valident leurs propres données, en émettant des avertissements de la norme Python warnings pour indiquer à l'utilisateur de l'application que le widget soupçonne un problème avec les données d'entrée de l'utilisateur. Parce que le dépiquage provoque l'instanciation des objets, ma compréhension du code implique que chaque objet widget est réinstancié exactement une fois si et seulement si il est finaliste après sa sortie du tuyau -- voir la section suivante pour voir pourquoi ce n'est pas correct.

Les widgets ont déjà été créés avant d'être frottés, de sorte que l'utilisateur est déjà douloureusement conscient des erreurs d'entrée qu'il a commises et ne veut pas en entendre parler à nouveau. Ce sont les avertissements que j'aimerais attraper avec la fonction warnings du module catch_warnings() gestionnaire de contexte (c'est-à-dire un with ).

Solutions ratées

Lors de mes tests, j'ai déterminé que les avertissements superflus étaient émis à n'importe quel moment entre ce que j'ai étiqueté comme suit Ligne A y Ligne B . Ce qui me surprend, c'est que les alertes sont émises dans des endroits autres que les environs output_queue.get() . Cela implique pour moi que multiprocessing envoie les widgets aux travailleurs en utilisant le décapage.

Le résultat est que la mise en place d'un gestionnaire de contexte créé par warnings.catch_warnings() même autour de tout, de Ligne A a Ligne B et le fait de définir le bon filtre d'avertissements dans ce contexte ne permet pas d'identifier les avertissements. Cela implique pour moi que les avertissements sont émis dans les processus de travail. En plaçant ce gestionnaire de contexte autour du code du travailleur, les avertissements ne sont pas non plus détectés.

Le code

Cet exemple omet le code permettant de décider si la taille du problème est trop petite pour se préoccuper de la bifurcation des processus, de l'importation du multiprocessing et de la définition de l'algorithme de l'utilisateur. my_frobnal_counter y my_load_balancer .

"Call `frobnicate(list_of_widgets)` to get the widget with the most frobnals"

def frobnicate_parallel_worker(widgets, output_queue):
    resultant_widget = max(widgets, key=my_frobnal_counter)
    output_queue.put(resultant_widget)

def frobnicate_parallel(widgets):
    output_queue = multiprocessing.Queue()
    # partitions: Generator yielding tuples of sets
    partitions = my_load_balancer(widgets)
    processes = []
    # Line A: Possible start of where the warnings are coming from.
    for partition in partitions:
        p = multiprocessing.Process(
                 target=frobnicate_parallel_worker,
                 args=(partition, output_queue))
        processes.append(p)
        p.start()
    finalists = []
    for p in processes:
        finalists.append(output_queue.get())
    # Avoid deadlocks in Unix by draining queue before joining processes
    for p in processes:
        p.join()
    # Line B: Warnings no longer possible after here.
    return max(finalists, key=my_frobnal_counter)

4 votes

> frobnicate C'est un mot si merveilleux.

5 votes

J'aimerais que nous ayons plus de questions bien formatées comme celle-ci.

1 votes

Puisque vous dites partitions donne des générateurs, est partition un générateur dans cet exemple ? Si c'est le cas, ce n'est pas un problème de décapage puisque les générateurs ne peuvent pas être décapés (et le code ne fonctionnerait pas sous Windows).

2voto

dnozay Points 3672

Vous pouvez essayer de remplacer le Process.run méthode à utiliser warnings.catch_warnings .

>>> from multiprocessing import Process
>>> 
>>> def yell(text):
...    import warnings
...    print 'about to yell %s' % text
...    warnings.warn(text)
... 
>>> class CustomProcess(Process):
...    def run(self, *args, **kwargs):
...       import warnings
...       with warnings.catch_warnings():
...          warnings.simplefilter("ignore")
...          return Process.run(self, *args, **kwargs)
... 
>>> if __name__ == '__main__':
...    quiet = CustomProcess(target=yell, args=('...not!',))
...    quiet.start()
...    quiet.join()
...    noisy = Process(target=yell, args=('AAAAAAaaa!',))
...    noisy.start()
...    noisy.join()
... 
about to yell ...not!
about to yell AAAAAAaaa!
__main__:4: UserWarning: AAAAAAaaa!
>>> 

ou vous pouvez utiliser certains des internes... ( __warningregistry__ )

>>> from multiprocessing import Process
>>> import exceptions
>>> def yell(text):
...    import warnings
...    print 'about to yell %s' % text
...    warnings.warn(text)
...    # not filtered
...    warnings.warn('complimentary second warning.')
... 
>>> WARNING_TEXT = 'AAAAaaaaa!'
>>> WARNING_TYPE = exceptions.UserWarning
>>> WARNING_LINE = 4
>>> 
>>> class SelectiveProcess(Process):
...    def run(self, *args, **kwargs):
...       registry = globals().setdefault('__warningregistry__', {})
...       registry[(WARNING_TEXT, WARNING_TYPE, WARNING_LINE)] = True
...       return Process.run(self, *args, **kwargs)
... 
>>> if __name__ == '__main__':
...    p = SelectiveProcess(target=yell, args=(WARNING_TEXT,))
...    p.start()
...    p.join()
... 
about to yell AAAAaaaaa!
__main__:6: UserWarning: complimentary second warning.
>>>

2voto

max Points 6673

Le dépiquage ne causerait pas le __init__ pour être exécuté deux fois. J'ai exécuté le code suivant sous Windows, et cela ne se produit pas (chaque __init__ est exécuté précisément une fois).

Par conséquent, vous devez nous fournir le code de my_load_balancer et de la classe des widgets. À ce stade, votre question ne fournit tout simplement pas assez d'informations.

Au hasard, vous pourriez vérifier si my_load_balancer fait des copies des widgets, ce qui fait qu'ils sont instanciés une fois de plus.

import multiprocessing
import collections

"Call `frobnicate(list_of_widgets)` to get the widget with the most frobnals"

def my_load_balancer(widgets):
    partitions = tuple(set() for _ in range(8))
    for i, widget in enumerate(widgets):
        partitions[i % 8].add(widget)
    for partition in partitions:
        yield partition

def my_frobnal_counter(widget):
    return widget.id

def frobnicate_parallel_worker(widgets, output_queue):
    resultant_widget = max(widgets, key=my_frobnal_counter)
    output_queue.put(resultant_widget)

def frobnicate_parallel(widgets):
    output_queue = multiprocessing.Queue()
    # partitions: Generator yielding tuples of sets
    partitions = my_load_balancer(widgets)
    processes = []
    # Line A: Possible start of where the warnings are coming from.
    for partition in partitions:
        p = multiprocessing.Process(
                 target=frobnicate_parallel_worker,
                 args=(partition, output_queue))
        processes.append(p)
        p.start()
    finalists = []
    for p in processes:
        finalists.append(output_queue.get())
    # Avoid deadlocks in Unix by draining queue before joining processes
    for p in processes:
        p.join()
    # Line B: Warnings no longer possible after here.
    return max(finalists, key=my_frobnal_counter)

class Widget:
    id = 0
    def __init__(self):
        print('initializing Widget {}'.format(self.id))
        self.id = Widget.id
        Widget.id += 1

    def __str__(self):
        return str(self.id)

    def __repr__(self):
        return str(self)

def main():

    widgets = [Widget() for _ in range(16)]
    result = frobnicate_parallel(widgets)
    print(result.id)

if __name__ == '__main__':
    main()

1voto

wkschwartz Points 683

Des années plus tard, j'ai enfin une solution (trouvée en travaillant sur un problème sans rapport). J'ai testé ceci sur Python 3.7, 3.8, et 3.9.

Temporairement patch sys.warnoptions avec la liste vide [] . Vous n'avez besoin de faire cela qu'autour de l'appel à process.start() . sys.warnoptions est documenté comme un détail d'implémentation que vous ne devriez pas modifier manuellement ; l'option recommandations officielles sont d'utiliser les fonctions de la warnings et de définir PYTHONWARNINGS en os.environ . Cela ne fonctionne pas. La seule chose qui semble fonctionner est Parcheando. sys.warnoptions . Dans un test, vous pouvez effectuer les opérations suivantes :

import multiprocessing
from unittest.mock import patch
p = multiprocessing.Process(target=my_function)
with patch('sys.warnoptions', []):
    p.start()
p.join()

Si vous ne voulez pas utiliser unittest.mock il suffit de le rafistoler à la main :

import multiprocessing
import sys
p = multiprocessing.Process(target=my_function)
old_warnoptions = sys.warnoptions
try:
    sys.warnoptions = []
    p.start()
finally:
    sys.warnoptions = old_warnoptions
p.join()

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X