3 votes

Comment nettoyer les connexions après une KeyboardInterrupt dans python-trio

Ma classe, lorsqu'elle est connectée au serveur, doit immédiatement envoyer s'inscrire puis, lorsque la session est terminée, il doit envoyer la chaîne de caractères se déconnecter et nettoyer les prises. Voici mon code.

import trio

class test:

    _buffer = 8192
    _max_retry = 4

    def __init__(self, host='127.0.0.1', port=12345, usr='user', pwd='secret'):
        self.host = str(host)
        self.port = int(port)
        self.usr = str(usr)
        self.pwd = str(pwd)
        self._nl = b'\r\n'
        self._attempt = 0
        self._queue = trio.Queue(30)
        self._connected = trio.Event()
        self._end_session = trio.Event()

    @property
    def connected(self):
        return self._connected.is_set()

    async def _sender(self, client_stream, nursery):
        print('## sender: started!')
        q = self._queue
        while True:
            cmd = await q.get()
            print('## sending to the server:\n{!r}\n'.format(cmd))
            if self._end_session.is_set():
                nursery.cancel_scope.shield = True
                with trio.move_on_after(1):
                    await client_stream.send_all(cmd)
                nursery.cancel_scope.shield = False
            await client_stream.send_all(cmd)

    async def _receiver(self, client_stream, nursery):
        print('## receiver: started!')
        buff = self._buffer
        while True:
            data = await client_stream.receive_some(buff)
            if not data:
                print('## receiver: connection closed')
                self._end_session.set()
                break
            print('## got data from the server:\n{!r}'.format(data))

    async def _watchdog(self, nursery):
        await self._end_session.wait()
        await self._queue.put(self._logoff)
        self._connected.clear()
        nursery.cancel_scope.cancel()

    @property
    def _login(self, *a, **kw):
        nl = self._nl
        usr, pwd = self.usr, self.pwd
        return nl.join(x.encode() for x in ['Login', usr,pwd]) + 2*nl

    @property
    def _logoff(self, *a, **kw):
        nl = self._nl
        return nl.join(x.encode() for x in ['Logoff']) + 2*nl

    async def _connect(self):
        host, port = self.host, self.port
        print('## connecting to {}:{}'.format(host, port))
        try:
            client_stream = await trio.open_tcp_stream(host, port)
        except OSError as err:
            print('##', err)
        else:
            async with client_stream:
                self._end_session.clear()
                self._connected.set()
                self._attempt = 0
                # Sign in as soon as connected
                await self._queue.put(self._login)
                async with trio.open_nursery() as nursery:
                    print("## spawning watchdog...")
                    nursery.start_soon(self._watchdog, nursery)
                    print("## spawning sender...")
                    nursery.start_soon(self._sender, client_stream, nursery)
                    print("## spawning receiver...")
                    nursery.start_soon(self._receiver, client_stream, nursery)

    def connect(self):
        while self._attempt <= self._max_retry:
            try:
                trio.run(self._connect)
                trio.run(trio.sleep, 1)
                self._attempt += 1
            except KeyboardInterrupt:
                self._end_session.set()
                print('Bye bye...')
                break

tst = test()
tst.connect()

Ma logique ne fonctionne pas tout à fait. Elle fonctionne si je tue le netcat ma session ressemble donc à ce qui suit :

## connecting to 127.0.0.1:12345
## spawning watchdog...
## spawning sender...
## spawning receiver...
## receiver: started!
## sender: started!
## sending to the server:
b'Login\r\nuser\r\nsecret\r\n\r\n'

## receiver: connection closed
## sending to the server:
b'Logoff\r\n\r\n'

Il convient de noter que Logoff a été envoyée, bien que cela n'ait pas de sens ici puisque la connexion est déjà rompue à ce moment-là.

Cependant, mon objectif est de Logoff lorsque l'utilisateur KeyboardInterrupt . Dans ce cas, ma session ressemble à ceci :

## connecting to 127.0.0.1:12345
## spawning watchdog...
## spawning sender...
## spawning receiver...
## receiver: started!
## sender: started!
## sending to the server:
b'Login\r\nuser\r\nsecret\r\n\r\n'

Bye bye...

Il convient de noter que Logoff n'a pas été exclu.

Des idées ?

4voto

Nathaniel J. Smith Points 1209

Dans ce cas, l'arbre d'appel ressemble à quelque chose comme

connect
|
+- _connect*
   |
   +- _watchdog*
   |
   +- _sender*
   |
   +- _receiver*

En * indiquent les 4 tâches du trio. Les _connect est assis à l'extrémité du bloc de la crèche et attend que les tâches de l'enfant soient terminées. Les _watchdog est bloquée en await self._end_session.wait() , le _sender est bloquée en await q.get() et le _receiver est bloquée en await client_stream.receive_some(...) .

Lorsque vous appuyez sur Ctrl-C, la sémantique standard de Python veut que le code Python en cours d'exécution lève soudainement le bouton KeyboardInterrupt . Dans ce cas, vous avez 4 tâches différentes en cours d'exécution, donc l'une de ces opérations bloquées est choisie au hasard [1], et lève un KeyboardInterrupt . Cela signifie que plusieurs choses peuvent se produire :

  • Si _watchdog 's wait suivre les levées de fonds KeyboardInterrupt , alors le _watchdog quitte immédiatement la méthode, de sorte qu'elle n'essaye même pas d'envoyer le message logout . Ensuite, dans le cadre du déroulement de la pile, trio annule toutes les autres tâches, et une fois qu'elles se sont éteintes, la tâche KeyboardInterrupt continue à se propager jusqu'à ce qu'il atteigne votre finally bloc en connect . A ce stade, vous essayez de notifier la tâche watchdog à l'aide de la fonction self._end_session.set() mais il ne fonctionne plus, donc il ne s'en aperçoit pas.

  • Si _sender 's q.get() suivre les levées de fonds KeyboardInterrupt , alors le _sender sort immédiatement, donc même si la méthode _watchdog ne lui a pas demandé d'envoyer un message de déconnexion, il ne sera pas là pour le remarquer. Et dans tous les cas, trio procède à l'annulation du chien de garde et des tâches du récepteur, et les choses se déroulent comme indiqué ci-dessus.

  • Si _receiver 's receive_all suivre les levées de fonds KeyboardInterrupt ... la même chose se produit.

  • Petite subtilité : _connect peut également recevoir le KeyboardInterrupt qui fait la même chose : elle annule tous les enfants, puis attend qu'ils s'arrêtent avant d'autoriser la fonction KeyboardInterrupt pour continuer à se propager.

Si vous voulez attraper le contrôle-C de manière fiable et en faire quelque chose, le fait qu'il soit levé à un moment aléatoire est une véritable nuisance. La façon la plus simple de le faire est d'utiliser Soutien de Trio aux signaux de capture pour attraper le signal.SIGINT qui est la chose que Python convertit normalement en un signal KeyboardInterrupt . (Le "INT" signifie "interrupt".) Quelque chose comme :

async def _control_c_watcher(self):
    # This API is currently a little cumbersome, sorry, see
    # https://github.com/python-trio/trio/issues/354
    with trio.catch_signals({signal.SIGINT}) as batched_signal_aiter:
        async for _ in batched_signal_aiter:
            self._end_session.set()
            # We exit the loop, restoring the normal behavior of
            # control-C. This way hitting control-C once will try to
            # do a polite shutdown, but if that gets stuck the user
            # can hit control-C again to raise KeyboardInterrupt and
            # force things to exit.
            break

et commencez à l'exécuter en même temps que vos autres tâches.

Vous avez également le problème que dans votre _watchdog il place la méthode logoff dans la file d'attente - programmant ainsi un message qui sera envoyé plus tard, par l'opérateur _sender et annule ensuite immédiatement toutes les tâches, de sorte que la tâche _sender tâche n'aura probablement pas l'occasion de voir le message et d'y réagir ! En général, je trouve que mon code fonctionne mieux lorsque je n'utilise les tâches que lorsque c'est nécessaire. Au lieu d'avoir une tâche d'envoi et de mettre les messages dans une file d'attente lorsque vous voulez les envoyer, pourquoi ne pas demander au code qui veut envoyer un message d'appeler stream.send_all directement ? La seule chose à laquelle vous devez faire attention est que si vous avez plusieurs tâches qui peuvent envoyer des choses simultanément, vous pouvez utiliser une fonction trio.Lock() pour s'assurer qu'ils ne se heurtent pas l'un à l'autre en appelant send_all en même temps :

async def send_all(self, data):
    async with self.send_lock:
        await self.send_stream.send_all(data)

async def do_logoff(self):
    # First send the message
    await self.send_all(b"Logoff\r\n\r\n")
    # And then, *after* the message has been sent, cancel the tasks
    self.nursery.cancel()

Si vous procédez de cette manière, vous pourrez peut-être vous débarrasser de la tâche du chien de garde et de la fonction _end_session l'événement dans son intégralité.

Quelques autres remarques sur votre code pendant que je suis ici :

  • Appel trio.run plusieurs fois comme cela est inhabituel. Le style normal est de l'appeler une fois au début de votre programme, et de mettre tout votre vrai code à l'intérieur. Une fois que vous quittez trio.run tous les états de trio sont perdus, vous n'exécutez aucune tâche concurrente (il n'y a donc aucune chance que quoi que ce soit puisse se produire), et vous n'êtes pas en train d'exécuter des tâches simultanées. éventuellement sont à l'écoute et remarquent votre appel à _end_session.set() !). Et en général, presque toutes les fonctions Trio supposent que vous êtes déjà à l'intérieur d'un appel à trio.run . Il s'avère que vous pouvez dès à présent appeler trio.Queue() avant de démarrer le trio sans obtenir d'exception, mais ce n'est qu'une coïncidence.

  • L'utilisation d'un blindage à l'intérieur _sender me semble étrange. Le blindage est généralement une fonction avancée que vous presque jamais et je ne pense pas qu'il s'agisse d'une exception.

J'espère que cela vous aidera ! Et si vous voulez parler davantage de questions de style/conception comme celle-ci, mais que vous craignez qu'elles soient trop vagues pour stack overflow ("ce programme est-il bien conçu ?"), n'hésitez pas à passer sur le site de la canal de discussion en trio .


[1] En fait, le trio choisit probablement la tâche principale pour diverses raisons, mais ce n'est pas garanti et, de toute façon, cela ne fait pas de différence ici.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X