3 votes

Mise en place du battement de coeur Rabbit MQ avec Kombu

Éditer:

Le principal problème est que la machine rabbitmq tierce semble tuer les connexions inactives de temps en temps. C'est à ce moment-là que je commence à recevoir des exceptions "Broken Pipe". La seule façon de remettre les communications en état normal est pour moi d'arrêter les processus et de les redémarrer. Je suppose qu'il y a une meilleure façon de faire?

--

Je suis un peu perdu ici. Je me connecte à un serveur RabbitMQ tiers pour envoyer des messages. De temps en temps, tous les sockets sur leur machine sont fermés et je finis par recevoir une exception "Broken Pipe".

On m'a dit de mettre en place une vérification de pulsation dans mon code mais je ne suis pas sûr de comment faire exactement. J'ai trouvé des informations ici: http://kombu.readthedocs.org/en/latest/changelog.html#version-2-3-0 mais pas d'exemple concret de code.

Faut-il seulement ajouter "?heartbeat=x" à la chaîne de connexion? Kombu fait-il le reste? Je vois que je dois appeler "Connection.heartbeat_check()" à "x/2". Dois-je créer une tâche périodique pour appeler cela? Comment la connexion se rétablit-elle?

J'utilise:

  • celery==3.0.12
  • kombu==2.5.4

Mon code ressemble à ceci actuellement. Une tâche simple Celery est appelée pour envoyer le message au serveur RabbitMQ tiers (suppression des journaux et des commentaires pour le simplifier, assez basique):

class SendMessageTask(Task):
    name = "campaign.backends.send"
    routing_key = "campaign.backends.send"
    ignore_result = True
    default_retry_delay = 60 # 1 minute.
    max_retries = 5

    def run(self, send_to, message, **kwargs):
    payload = "Message de test"

    try:
        conn = BrokerConnection(
        hostname=HOSTNAME,
        port=PORT,
        userid=USER_ID,
        password=PASSWORD,
        virtual_host=VHOST
        )

        with producers[conn].acquire(block=True) as producer:
        publish = conn.ensure(producer, producer.publish, errback=sending_errback, max_retries=3)
        publish(
            body=payload,
            routing_key=OUT_ROUTING_KEY,
            delivery_mode=2,
            exchange=EXCHANGE,
            serializer=None,
            content_type='text/xml',
            content_encoding = 'utf-8'
        )

    except Exception, ex:
        print ex

Merci pour toute aide apportée.

2voto

asksol Points 9574

Alors que vous pouvez certainement ajouter la prise en charge des battements cardiaques à un producteur, il est plus logique pour les processus consommateurs.

Activer les battements cardiaques signifie que vous devez envoyer des battements cardiaques régulièrement, par exemple si le battement cardiaque est réglé sur 1 seconde, alors vous devez envoyer un battement cardiaque chaque seconde ou plus, sinon le serveur distant fermera la connexion.

Cela signifie que vous devez utiliser un thread séparé ou utiliser IO asynchrone pour envoyer de manière fiable les battements cardiaques à temps, et comme une connexion ne peut pas être partagée entre les threads, cela nous laisse avec IO asynchrone.

La bonne nouvelle est que vous ne tirerez probablement pas beaucoup d'avantages à ajouter des battements cardiaques à une connexion de type production uniquement.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X