164 votes

Interruptions du clavier avec le pool de multitraitement de python

Comment puis-je gérer les événements KeyboardInterrupt avec les Pools multiprocesseurs de python ? Voici un exemple simple :

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

Lors de l'exécution du code ci-dessus, le KeyboardInterrupt se lève lorsque j'appuie sur ^C mais le processus se bloque tout simplement à ce moment-là et je dois le tuer en externe.

Je veux pouvoir appuyer sur ^C à tout moment et faire en sorte que tous les processus se terminent gracieusement.

0 votes

J'ai résolu mon problème en utilisant psutil, vous pouvez voir la solution ici : stackoverflow.com/questions/32160054/

144voto

Glenn Maynard Points 24451

C'est un bug de Python. Lors de l'attente d'une condition dans threading.Condition.wait(), KeyboardInterrupt n'est jamais envoyé. Repro :

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

L'exception KeyboardInterrupt ne sera pas délivrée avant le retour de wait(), qui ne revient jamais, de sorte que l'interruption ne se produit jamais. KeyboardInterrupt devrait presque certainement interrompre une condition d'attente.

Notez que cela ne se produit pas si un timeout est spécifié ; cond.wait(1) recevra l'interruption immédiatement. Une solution de contournement consiste donc à spécifier un délai d'attente. Pour ce faire, remplacez

    results = pool.map(slowly_square, range(40))

avec

    results = pool.map_async(slowly_square, range(40)).get(9999999)

ou similaire.

4 votes

Ce bogue figure-t-il quelque part dans le tracker officiel de Python ? J'ai du mal à le trouver mais je n'utilise probablement pas les meilleurs termes de recherche.

18 votes

Ce bogue a été classé comme [Issue 8296][1]. [1] : bugs.python.org/issue8296

1 votes

Voici un hack qui corrige pool.imap() de la même manière, rendant possible le Ctrl-C lors de l'itération sur imap. Attrapez l'exception et appelez pool.terminate() et votre programme se terminera. gist.github.com/626518

67voto

John Reese Points 145

D'après ce que j'ai découvert récemment, la meilleure solution consiste à configurer les processus de travail pour qu'ils ignorent complètement SIGINT, et à confier tout le code de nettoyage au processus parent. Cela résout le problème pour les processus travailleurs inactifs et occupés, et ne nécessite aucun code de gestion des erreurs dans vos processus enfants.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

L'explication et le code d'exemple complet se trouvent à l'adresse suivante http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ y http://github.com/jreese/multiprocessing-keyboardinterrupt respectivement.

5 votes

Salut John. Ta solution n'accomplit pas la même chose que ma solution, malheureusement compliquée. Elle se cache derrière le time.sleep(10) dans le processus principal. Si vous supprimez cette mise en veille, ou si vous attendez que le processus tente de rejoindre le pool, ce que vous devez faire pour garantir que les travaux sont terminés, vous souffrez toujours du même problème, à savoir que le processus principal ne reçoit pas l'interruption KeyboardInterrupt alors qu'il attend un poll. join fonctionnement.

0 votes

Dans le cas où j'ai utilisé ce code en production, le time.sleep() faisait partie d'une boucle qui vérifiait l'état de chaque processus enfant, puis redémarrait certains processus en différé si nécessaire. Plutôt que d'attendre que tous les processus soient terminés, la méthode join() les vérifie individuellement, ce qui permet au processus maître de rester réactif.

2 votes

Il s'agissait donc plutôt d'une attente occupée (peut-être avec de petites périodes d'inactivité entre les vérifications) qui demandait l'achèvement du processus par une autre méthode plutôt que par la jointure ? Si c'est le cas, il serait peut-être préférable d'inclure ce code dans votre article de blog, puisque vous pouvez alors garantir que tous les travailleurs ont terminé avant de tenter de joindre.

33voto

Andrey Vlasovskikh Points 6903

Pour certaines raisons, seules les exceptions héritées de la base Exception sont traitées normalement. En guise de solution de contournement, vous pouvez relancer la classe KeyboardInterrupt comme un Exception instance :

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

Normalement, vous devriez obtenir le résultat suivant :

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

Donc si vous tapez ^C vous obtiendrez :

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end

2 votes

Il semble que ce ne soit pas une solution complète. Si un KeyboardInterrupt est arrivé alors que multiprocessing effectue son propre échange de données IPC, alors le programme try..catch ne sera pas activé (évidemment).

0 votes

Vous pourriez remplacer raise KeyboardInterruptError avec un return . Il suffit de s'assurer que le processus enfant se termine dès que KeyboardInterrupt est reçu. La valeur de retour semble être ignorée, en main toujours la réception de la KeyboardInterrupt.

10voto

igco Points 51

Cette structure simple fonctionne généralement pour Ctrl - C sur la piscine :

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

Comme cela a été dit dans quelques posts similaires :

Capturer l'interruption du clavier en Python sans try-except

1 votes

Cette opération devrait également être effectuée sur chacun des processus de travail, et pourrait encore échouer si l'interruption clavier est déclenchée pendant l'initialisation de la bibliothèque multiprocessus.

4voto

bboe Points 1387

J'ai trouvé, pour l'instant, que la meilleure solution est de ne pas utiliser la fonctionnalité multiprocessing.pool mais plutôt de développer votre propre fonctionnalité de pool. J'ai fourni un exemple démontrant l'erreur avec apply_async ainsi qu'un exemple montrant comment éviter complètement l'utilisation de la fonctionnalité de pool.

http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/

0 votes

Ça marche comme un charme. C'est une solution propre et pas une sorte de hack (/moi pense).btw, l'astuce avec .get(99999) comme proposé par d'autres nuit gravement aux performances.

0 votes

Je n'ai pas remarqué de pénalité de performance en utilisant un délai d'attente, bien que j'aie utilisé 9999 au lieu de 999999. L'exception est lorsqu'une exception qui n'hérite pas de la classe Exception est levée : il faut alors attendre que le délai d'attente soit atteint. La solution à ce problème est d'attraper toutes les exceptions (voir ma solution).

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X