3 votes

Multiprocessing.Queue avec de grandes données provoque _wait_for_tstate_lock

Une exception est levée dans threading._wait_for_tstate_lock lorsque je transfère des données volumineuses entre un Process et un Thread via multiprocessing.Queue .

Mon exemple minimal de travail semble d'abord un peu complexe - désolé. Je vais vous expliquer. L'application originale charge un grand nombre de fichiers (pas si importants) dans la RAM. Ceci est fait dans un processus séparé pour économiser des ressources. Le thread principal du gui ne devrait pas geler.

  1. L'interface graphique démarre une Thread pour éviter que la boucle d'événements du gui ne se fige.

  2. Ce distinct Thread puis commence un Process qui devrait faire le travail.

a) Cette Thread instancie un multiprocess.Queue (attention, il s'agit d'une multiprocessing y no threading !)

b) Ceci est donné à la Process pour le partage des données de Process à la Thread .

  1. Le site Process effectue un travail (3 étapes) et .put() le résultat dans le multiprocessing.Queue .

  2. Lorsque le Process met fin à la Thread reprend et collecte les données de la Queue et le stocker dans son propre attribut MyThread.result .

  3. Le site Thread indique à la boucle/thread principal de l'interface graphique d'appeler une fonction de rappel si elle en a le temps.

  4. La fonction de rappel ( MyWindow::callback_thread_finished() ) obtenir les résultats de MyWindow.thread.result .

Le problème est que si les données mises dans le Queue c'est de faire quelque chose de gros que je ne comprends pas - la MyThread ne se termine jamais. Je dois fermer l'application via Strg+C.

J'ai reçu quelques conseils de la documentation. Mais mon problème est que je n'ai pas entièrement compris la documentation. Mais j'ai le sentiment que la clé de mes problèmes s'y trouve. Veuillez voir les deux encadrés rouges dans " Tuyaux et files d'attente "(documentation de Python 3.5). C'est la sortie complète

MyWindow::do_start()
Running MyThread...
Running MyProcess...
MyProcess stoppd.
^CProcess MyProcess-1:
Exception ignored in: <module 'threading' from '/usr/lib/python3.5/threading.py'>
Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 1288, in _shutdown
    t.join()
  File "/usr/lib/python3.5/threading.py", line 1054, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 252, in _bootstrap
    util._exit_function()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 314, in _exit_function
    _run_finalizers()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 254, in _run_finalizers
    finalizer()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 186, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 198, in _finalize_join
    thread.join()
  File "/usr/lib/python3.5/threading.py", line 1054, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

Voici l'exemple minimal de fonctionnement

#!/usr/bin/env python3

import multiprocessing
import threading
import time
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GLib

class MyThread (threading.Thread):
    """This thread just starts the process."""
    def __init__(self, callback):
        threading.Thread.__init__(self)
        self._callback = callback

    def run(self):
        print('Running MyThread...')
        self.result = []

        queue = multiprocessing.Queue()
        process = MyProcess(queue)
        process.start()
        process.join()

        while not queue.empty():
            process_result = queue.get()
            self.result.append(process_result)
        print('MyThread stoppd.')
        GLib.idle_add(self._callback)

class MyProcess (multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        print('Running MyProcess...')
        for i in range(3):
            self.queue.put((i, 'x'*102048))
        print('MyProcess stoppd.')

class MyWindow (Gtk.Window):
    def __init__(self):
        Gtk.Window.__init__(self)
        self.connect('destroy', Gtk.main_quit)
        GLib.timeout_add(2000, self.do_start)

    def do_start(self):
        print('MyWindow::do_start()')
        # The process need to be started from a separate thread
        # to prevent the main thread (which is the gui main loop)
        # from freezing while waiting for the process result.
        self.thread = MyThread(self.callback_thread_finished)
        self.thread.start()

    def callback_thread_finished(self):
        result = self.thread.result
        for r in result:
            print('{} {}...'.format(r[0], r[1][:10]))

if __name__ == '__main__':
    win = MyWindow()
    win.show_all()
    Gtk.main()

Duplicata possible mais assez différent et IMO sans réponse pour ma situation : Thread._wait_for_tstate_lock() n'est jamais retourné .

Solution de rechange

Utilisation d'un Directeur en modifiant la ligne 22 en queue = multiprocessing.Manager().Queue() résoudre le problème. Mais je ne sais pas pourquoi. L'intention de cette question est de comprendre les choses derrière et pas seulement de faire fonctionner mon code. Même si je ne sais pas vraiment ce qu'est un Manager() et si cela a d'autres implications (causant des problèmes).

3voto

BlackJack Points 2889

D'après la deuxième case d'avertissement dans la documentation à laquelle vous vous référez, vous pouvez obtenir un blocage lorsque vous rejoignez un processus avant de traiter tous les éléments de la file d'attente. Donc, démarrer le processus et le rejoindre immédiatement et puis le traitement des éléments de la file d'attente se fait dans le mauvais ordre des étapes. Vous devez lancer le processus, puis recevoir les éléments, et ce n'est que lorsque tous les éléments sont reçus que vous pouvez appeler la méthode de jonction. Définissez une valeur sentinelle pour signaler que le processus a fini d'envoyer des données dans la file d'attente. None par exemple si cela ne peut pas être une valeur régulière que vous attendez du processus.

class MyThread(threading.Thread):
    """This thread just starts the process."""

    def __init__(self, callback):
        threading.Thread.__init__(self)
        self._callback = callback
        self.result = []

    def run(self):
        print('Running MyThread...')
        queue = multiprocessing.Queue()
        process = MyProcess(queue)
        process.start()
        while True:
            process_result = queue.get()
            if process_result is None:
                break
            self.result.append(process_result)
        process.join()
        print('MyThread stoppd.')
        GLib.idle_add(self._callback)

class MyProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        print('Running MyProcess...')
        for i in range(3):
            self.queue.put((i, 'x' * 102048))
        self.queue.put(None)
        print('MyProcess stoppd.')

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X