Je suis en train de construire un chien de garde qui exécute un autre programme Python et qui, s'il ne trouve pas de check-in de l'un des threads, arrête tout le programme. Ceci afin qu'il puisse, éventuellement, prendre le contrôle des ports de communication nécessaires. Le code pour le timer est le suivant :
from multiprocessing import Process, Queue
from time import sleep
from copy import deepcopy
PATH_TO_FILE = r'.\test_program.py'
WATCHDOG_TIMEOUT = 2
class Watchdog:
def __init__(self, filepath, timeout):
self.filepath = filepath
self.timeout = timeout
self.threadIdQ = Queue()
self.knownThreads = {}
def start(self):
threadIdQ = self.threadIdQ
process = Process(target = self._executeFile)
process.start()
try:
while True:
unaccountedThreads = deepcopy(self.knownThreads)
# Empty queue since last wake. Add new thread IDs to knownThreads, and account for all known thread IDs
# in queue
while not threadIdQ.empty():
threadId = threadIdQ.get()
if threadId in self.knownThreads:
unaccountedThreads.pop(threadId, None)
else:
print('New threadId < {} > discovered'.format(threadId))
self.knownThreads[threadId] = False
# If there is a known thread that is unaccounted for, then it has either hung or crashed.
# Shut everything down.
if len(unaccountedThreads) > 0:
print('The following threads are unaccounted for:\n')
for threadId in unaccountedThreads:
print(threadId)
print('\nShutting down!!!')
break
else:
print('No unaccounted threads...')
sleep(self.timeout)
# Account for any exceptions thrown in the watchdog timer itself
except:
process.terminate()
raise
process.terminate()
def _executeFile(self):
with open(self.filepath, 'r') as f:
exec(f.read(), {'wdQueue' : self.threadIdQ})
if __name__ == '__main__':
wd = Watchdog(PATH_TO_FILE, WATCHDOG_TIMEOUT)
wd.start()
J'ai aussi un petit programme pour tester la fonctionnalité du chien de garde.
from time import sleep
from threading import Thread
from queue import SimpleQueue
Q_TO_Q_DELAY = 0.013
class QToQ:
def __init__(self, processQueue, threadQueue):
self.processQueue = processQueue
self.threadQueue = threadQueue
Thread(name='queueToQueue', target=self._run).start()
def _run(self):
pQ = self.processQueue
tQ = self.threadQueue
while True:
while not tQ.empty():
sleep(Q_TO_Q_DELAY)
pQ.put(tQ.get())
def fastThread(q):
while True:
print('Fast thread, checking in!')
q.put('fastID')
sleep(0.5)
def slowThread(q):
while True:
print('Slow thread, checking in...')
q.put('slowID')
sleep(1.5)
def hangThread(q):
print('Hanging thread, checked in')
q.put('hangID')
while True:
pass
print('Hello! I am a program that spawns threads!\n\n')
threadQ = SimpleQueue()
Thread(name='fastThread', target=fastThread, args=(threadQ,)).start()
Thread(name='slowThread', target=slowThread, args=(threadQ,)).start()
Thread(name='hangThread', target=hangThread, args=(threadQ,)).start()
QToQ(wdQueue, threadQ)
Comme vous pouvez le voir, j'ai besoin que les threads soient placés dans une queue.Queue, tandis qu'un objet séparé alimente lentement la sortie de la queue.Queue dans la queue de multitraitement. Si au lieu de cela, je place les threads directement dans la file d'attente du multitraitement, ou si je ne fais pas dormir l'objet QToQ entre les mises en attente, la file d'attente du multitraitement se bloquera, et semblera toujours vide du côté du chien de garde.
Maintenant, comme la file d'attente multitraitement est censée être thread et processus sûr, je ne peux que supposer que j'ai foiré quelque chose dans la mise en œuvre. Ma solution semble fonctionner, mais elle est aussi suffisamment compliquée pour que je pense devoir la corriger.
J'utilise Python 3.7.2, si cela a de l'importance.