81 votes

zeromq : comment éviter l'attente infinie ?

Je viens de commencer à utiliser ZMQ. Je conçois une application dont le flux de travail est :

  1. un des nombreux clients (qui ont des adresses PULL aléatoires) PUSH une requête à un serveur à 5555
  2. le serveur est toujours en train d'attendre les PUSH des clients. Quand l'un d'eux arrive, un processus de travailleur est créé pour cette requête particulière. Oui, les processus de travail peuvent exister simultanément.
  3. Lorsque ce processus termine sa tâche, il envoie le résultat au client par PUSH.

Je suppose que l'architecture PUSH/PULL est adaptée à cela. Veuillez consulter le site corrigez-moi sur ce sujet.


Mais comment gérer ces scénarios ?

  1. le client_receiver.recv() attendra un temps infini si le serveur ne répond pas.
  2. le client peut envoyer une requête, mais elle échouera immédiatement après, et un processus travailleur restera bloqué sur server_sender.send() pour toujours.

Alors comment puis-je configurer quelque chose comme un délai d'attente dans le modèle PUSH/PULL ?


EDIT : Grâce aux suggestions de user938949, j'ai obtenu un réponse fonctionnelle et je le partage pour la postérité.

1 votes

Je ne suis pas un expert de 0mq, mais dans beaucoup de situations comme celle-ci, il est préférable que votre pool de travailleurs soit créé au démarrage plutôt que de créer des travailleurs en réponse aux messages. Peut-être que je ne vous comprends pas bien.

0 votes

Bon point. En fait, je prévois de pré-engraisser les travailleurs. Je viens de réaliser que cela peut être trivial avec 0mq.

88voto

minrk Points 10008

Si vous utilisez zeromq >= 3.0, vous pouvez définir l'option de socket RCVTIMEO :

client_receiver.RCVTIMEO = 1000 # in milliseconds

Mais en général, vous pouvez utiliser des pollers :

poller = zmq.Poller()
poller.register(client_receiver, zmq.POLLIN) # POLLIN for recv, POLLOUT for send

Et poller.poll() prend un délai d'attente :

evts = poller.poll(1000) # wait *up to* one second for a message to arrive.

evts sera une liste vide s'il n'y a rien à recevoir.

Vous pouvez sonder avec zmq.POLLOUT pour vérifier si un envoi va réussir.

Ou, pour gérer le cas d'un pair qui aurait échoué, un :

worker.send(msg, zmq.NOBLOCK)

pourrait suffire, qui retournera toujours immédiatement - en soulevant un ZMQError(zmq.EAGAIN) si l'envoi n'a pas pu se terminer.

1 votes

Pourriez-vous donner des précisions sur zmq.NOBLOCK ?

0 votes

Bonjour, devons-nous réenregistrer une socket (dans un poller) chaque fois que nous la déconnectons et la reconnectons ?

0 votes

Non, ce n'est que si vous fermez la prise et en ouvrez une nouvelle que vous devez vous réenregistrer.

18voto

aitchnyu Points 5048

C'était un hack rapide que j'ai faite après avoir consulté la réponse de l'utilisateur938949 et http://taotetek.wordpress.com/2011/02/02/python-multiprocessing-with-zeromq/ . Si vous faites mieux, veuillez poster votre réponse, Je recommande votre réponse .

Pour ceux qui veulent des solutions durables sur la fiabilité, référez-vous http://zguide.zeromq.org/page:all#toc64

La version 3.0 de zeromq (beta ATM) supporte délai d'attente dans ZMQ_RCVTIMEO et ZMQ_SNDTIMEO. http://api.zeromq.org/3-0:zmq-setsockopt

Serveur

Le zmq.NOBLOCK assure que lorsqu'un client n'existe pas, le send() ne se bloque pas.

import time
import zmq
context = zmq.Context()

ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")

i=0

while True:
    i=i+1
    time.sleep(0.5)
    print ">>sending message ",i
    try:
        ventilator_send.send(repr(i),zmq.NOBLOCK)
        print "  succeed"
    except:
        print "  failed"

Client

L'objet poller peut écouter sur plusieurs sockets récepteurs (voir le lien "Python Multiprocessing with ZeroMQ" ci-dessus). Je l'ai lié seulement sur récepteur de travail . Dans la boucle infinie, le client interroge avec un intervalle de 1000 ms. Le site chaussettes retourne un objet vide si aucun message n'a été reçu pendant ce temps.

import time
import zmq
context = zmq.Context()

work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")

poller = zmq.Poller()
poller.register(work_receiver, zmq.POLLIN)

# Loop and accept messages from both channels, acting accordingly
while True:
    socks = dict(poller.poll(1000))
    if socks:
        if socks.get(work_receiver) == zmq.POLLIN:
            print "got message ",work_receiver.recv(zmq.NOBLOCK)
    else:
        print "error: message timeout"

0 votes

Est-ce que Python a select ? La bibliothèque de Ruby en possède une comme IO.select . Vous pouvez : if ZMQ.select([read_socket], nil, nil, 20); puts read_socket.recv; else; puts 'timeout 20 secs'; end

11voto

Adobri Points 126

L'envoi ne bloquera pas si vous utilisez ZMQ_NOBLOCK, mais si vous essayez de fermer la socket et le contexte, cette étape bloquera la sortie du programme .

La raison en est que la socket attend tout pair afin que les messages sortants soient assurés d'être mis en file d'attente . Pour fermer la socket immédiatement et vider les messages sortants du tampon, utilisez ZMQ_LINGER et mettez-le à 0

2 votes

Zmq.RCVTIMEO ne vous aidera pas si vous n'utilisez pas zmq.LINGER car après le délai d'attente, la socket ne sera toujours pas fermée. Ceci devrait être ajouté à la réponse choisie.

11voto

Mathieu Longtin Points 2327

Si vous n'attendez qu'un seul socket, plutôt que de créer un fichier Poller vous pouvez le faire :

if work_receiver.poll(1000, zmq.POLLIN):
    print "got message ",work_receiver.recv(zmq.NOBLOCK)
else:
    print "error: message timeout"

Vous pouvez utiliser cette option si votre délai d'attente change en fonction de la situation, au lieu de définir l'option suivante work_receiver.RCVTIMEO .

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X