4 votes

Pourquoi ma file d'attente multiprocessus ne semble pas être thread safe ?

Je suis en train de construire un chien de garde qui exécute un autre programme Python et qui, s'il ne trouve pas de check-in de l'un des threads, arrête tout le programme. Ceci afin qu'il puisse, éventuellement, prendre le contrôle des ports de communication nécessaires. Le code pour le timer est le suivant :

from multiprocessing import Process, Queue
from time import sleep
from copy import deepcopy

PATH_TO_FILE = r'.\test_program.py'
WATCHDOG_TIMEOUT = 2

class Watchdog:

    def __init__(self, filepath, timeout):
        self.filepath = filepath
        self.timeout = timeout
        self.threadIdQ = Queue()
        self.knownThreads = {}

    def start(self):
        threadIdQ = self.threadIdQ

        process = Process(target = self._executeFile)
        process.start()
        try:
            while True:
                unaccountedThreads = deepcopy(self.knownThreads)

                # Empty queue since last wake. Add new thread IDs to knownThreads, and account for all known thread IDs
                # in queue
                while not threadIdQ.empty():
                    threadId = threadIdQ.get()
                    if threadId in self.knownThreads:
                        unaccountedThreads.pop(threadId, None)
                    else:
                        print('New threadId < {} > discovered'.format(threadId))
                        self.knownThreads[threadId] = False

                # If there is a known thread that is unaccounted for, then it has either hung or crashed.
                # Shut everything down.
                if len(unaccountedThreads) > 0:
                    print('The following threads are unaccounted for:\n')
                    for threadId in unaccountedThreads:
                        print(threadId)
                    print('\nShutting down!!!')
                    break
                else:
                    print('No unaccounted threads...')

                sleep(self.timeout)

        # Account for any exceptions thrown in the watchdog timer itself
        except:
            process.terminate()
            raise

        process.terminate()

    def _executeFile(self):
        with open(self.filepath, 'r') as f:
            exec(f.read(), {'wdQueue' : self.threadIdQ})

if __name__ == '__main__':
    wd = Watchdog(PATH_TO_FILE, WATCHDOG_TIMEOUT)
    wd.start()

J'ai aussi un petit programme pour tester la fonctionnalité du chien de garde.

from time import sleep
from threading import Thread
from queue import SimpleQueue

Q_TO_Q_DELAY = 0.013

class QToQ:

    def __init__(self, processQueue, threadQueue):
        self.processQueue = processQueue
        self.threadQueue = threadQueue
        Thread(name='queueToQueue', target=self._run).start()

    def _run(self):
        pQ = self.processQueue
        tQ = self.threadQueue
        while True:
            while not tQ.empty():
                sleep(Q_TO_Q_DELAY)
                pQ.put(tQ.get())

def fastThread(q):
    while True:
        print('Fast thread, checking in!')
        q.put('fastID')
        sleep(0.5)

def slowThread(q):
    while True:
        print('Slow thread, checking in...')
        q.put('slowID')
        sleep(1.5)

def hangThread(q):
    print('Hanging thread, checked in')
    q.put('hangID')
    while True:
        pass

print('Hello! I am a program that spawns threads!\n\n')

threadQ = SimpleQueue()

Thread(name='fastThread', target=fastThread, args=(threadQ,)).start()
Thread(name='slowThread', target=slowThread, args=(threadQ,)).start()
Thread(name='hangThread', target=hangThread, args=(threadQ,)).start()

QToQ(wdQueue, threadQ)

Comme vous pouvez le voir, j'ai besoin que les threads soient placés dans une queue.Queue, tandis qu'un objet séparé alimente lentement la sortie de la queue.Queue dans la queue de multitraitement. Si au lieu de cela, je place les threads directement dans la file d'attente du multitraitement, ou si je ne fais pas dormir l'objet QToQ entre les mises en attente, la file d'attente du multitraitement se bloquera, et semblera toujours vide du côté du chien de garde.

Maintenant, comme la file d'attente multitraitement est censée être thread et processus sûr, je ne peux que supposer que j'ai foiré quelque chose dans la mise en œuvre. Ma solution semble fonctionner, mais elle est aussi suffisamment compliquée pour que je pense devoir la corriger.

J'utilise Python 3.7.2, si cela a de l'importance.

1voto

quamrana Points 6411

Je soupçonne que test_program.py sortent.

J'ai changé les dernières lignes en ceci :

tq = threadQ
# tq = wdQueue    # option to send messages direct to WD

t1 = Thread(name='fastThread', target=fastThread, args=(tq,))
t2 = Thread(name='slowThread', target=slowThread, args=(tq,))
t3 = Thread(name='hangThread', target=hangThread, args=(tq,))

t1.start()
t2.start()
t3.start()
QToQ(wdQueue, threadQ)

print('Joining with threads...')
t1.join()
t2.join()
t3.join()

print('test_program exit')

Les appels à join() signifie que le programme de test ne se termine jamais tout seul puisqu'aucun des threads ne se termine.

Donc, tel quel, t3 se bloque et le programme watchdog le détecte et détecte le thread non pris en compte et arrête le programme de test.

Si t3 est retiré du programme ci-dessus, alors les deux autres threads se comportent bien et le programme chien de garde permet au programme de test de continuer indéfiniment.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X