129 votes

RabbitMQ / AMQP: file d'attente unique, plusieurs consommateurs pour le même message?

Je commence tout juste à utiliser RabbitMQ et AMQP en général.

  • J'ai une file d'attente de messages
  • J'ai plusieurs consommateurs, dont je voudrais faire des choses différentes avec le même message.

La plupart des RabbitMQ documentation semble focalisée sur les round-robin", c'est à dire où un seul message est consommé par un consommateur unique, avec la charge étant réparti entre chaque consommateur. C'est en effet le comportement je suis témoin.

Un exemple: le producteur a une seule file d'attente et envoyer des messages à toutes les 2 sec:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  var sendMessage = function(connection, queue_name, payload) {
    var encoded_payload = JSON.stringify(payload);  
    connection.publish(queue_name, encoded_payload);
  }

  setInterval( function() {    
    var test_message = 'TEST '+count
    sendMessage(connection, "my_queue_name", test_message)  
    count += 1;
  }, 2000) 


})

Et voici un consommateur:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
  connection.queue("my_queue_name", function(queue){
    queue.bind('#'); 
    queue.subscribe(function (message) {
      var encoded_payload = unescape(message.data)
      var payload = JSON.parse(encoded_payload)
      console.log('Recieved a message:')
      console.log(payload)
    })
  })
})

Si je commence à la consommation deux fois, je peux voir que chaque consommateur est de consommer des messages alternatifs en round-robin comportement. Par exemple, je vais voir les messages 1, 3, 5 dans un terminal, 2, 4, 6 dans l'autre.

Ma question est:

  • Puis-je avoir chaque consommateur de recevoir les mêmes messages? C'est à dire, à la fois les consommateurs à avoir de message 1, 2, 3, 4, 5, 6? Qu'est-ce qui est appelé dans AMQP/RabbitMQ parler? Comment est-il configuré?

  • Est-ce fait couramment? Dois-je juste l'échange de router le message en deux files d'attente, avec un seul consommateur, à la place?

107voto

mikemaccana Points 7470
  • Puis-je avoir chaque consommateur de recevoir les mêmes messages? C'est à dire, à la fois les consommateurs à avoir de message 1, 2, 3, 4, 5, 6? Qu'est-ce qui est appelé dans AMQP/RabbitMQ parler? Comment est-il configuré?

Non, si les consommateurs sont sur la même file d'attente. De RabbitMQ est AMQP Concepts guide:

"il est important de comprendre que, dans AMQP 0-9-1, les messages sont équilibrées entre les consommateurs."

Cela semble impliquer que le round-robin comportement au sein d'une file d'attente est une donnée, et ne sont pas configurables. C'est à dire, des files d'attente sont nécessaires afin d'avoir le même message d'ID d'être manipulé par plusieurs consommateurs.

  • Est-ce fait couramment? Dois-je juste l'échange de router le message en deux files d'attente, avec un seul consommateur, à la place?

Non, il n'est pas, seule file d'attente/plusieurs consommateurs avec chaque chaque consommateur de la manipulation de la même ID de message n'est pas possible. Avoir l'échange de router le message sur deux files d'attente est en effet mieux.

Comme je n'ai pas besoin de trop complexe de routage, une sortance échange va gérer ce bien. Je n'ai pas trop se concentrer sur les Échanges plus tôt en tant que nœud-amqp est le concept d'un " défaut d'échange vous permettant de publier des messages à une connexion directement, cependant, la plupart des AMQP messages sont publiés à un échange spécifique.

Voici mon sortance échange, envoi et de réception:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   

    var sendMessage = function(exchange, payload) {
      console.log('about to publish')
      var encoded_payload = JSON.stringify(payload);
      exchange.publish('', encoded_payload, {})
    }

    // Recieve messages
    connection.queue("my_queue_name", function(queue){
      console.log('Created queue')
      queue.bind(exchange, ''); 
      queue.subscribe(function (message) {
        console.log('subscribed to queue')
        var encoded_payload = unescape(message.data)
        var payload = JSON.parse(encoded_payload)
        console.log('Recieved a message:')
        console.log(payload)
      })
    })

    setInterval( function() {    
      var test_message = 'TEST '+count
      sendMessage(exchange, test_message)  
      count += 1;
    }, 2000) 
 })
})

22voto

driushkin Points 1722

Il suffit de lire le tutoriel rabbitmq . Vous publiez un message pour échanger, pas pour faire la queue; il est ensuite acheminé vers les files d'attente appropriées. Dans votre cas, vous devez lier une file d'attente distincte pour chaque consommateur. De cette façon, ils peuvent utiliser les messages de manière totalement indépendante.

6voto

robthewolf Points 2793

Oui, chaque consommateur peut recevoir les mêmes messages. jetez un oeil à http://www.rabbitmq.com/tutorials/tutorial-three-python.html http://www.rabbitmq.com/tutorials/tutorial-four-python.html http://www.rabbitmq.com/tutorials/tutorial-five-python.html

différents moyens pour acheminer les messages. Je sais qu'ils sont pour python et java, mais il est bon de comprendre les principes, de décider ce que vous faites et ensuite trouver comment le faire en JS. Ses sons comme vous voulez faire un simple sortance (tutoriel 3), qui envoie les messages à toutes les files d'attente reliée à l'échange.

La différence avec ce que vous faites et ce que vous voulez faire est essentiellement que vous allez installer et l'échange ou le type de cette distribution. Sortance excahnges envoyer tous les messages à tous connectés les files d'attente. Chaque file d'attente auront un consommateur qui aura accès à tous les messages séparément.

Oui cela se fait couramment, c'est l'une des caractéristiques de toujours déléguée.

6voto

Peter Ritchie Points 18352

Le modèle d'envoi est une relation un à un. Si vous souhaitez "envoyer" à plusieurs destinataires, vous devez utiliser le modèle pub / sub. Voir http://www.rabbitmq.com/tutorials/tutorial-three-python.html pour plus de détails.

3voto

durai Points 11

RabbitMQ / AMQP: file d'attente unique, plusieurs consommateurs pour le même message et actualisation de la page.

 rabbit.on('ready', function () {    });
    sockjs_chat.on('connection', function (conn) {

        conn.on('data', function (message) {
            try {
                var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, ''));

                if (obj.header == "register") {

                    // Connect to RabbitMQ
                    try {
                        conn.exchange = rabbit.exchange(exchange, { type: 'topic',
                            autoDelete: false,
                            durable: false,
                            exclusive: false,
                            confirm: true
                        });

                        conn.q = rabbit.queue('my-queue-'+obj.agentID, {
                            durable: false,
                            autoDelete: false,
                            exclusive: false
                        }, function () {
                            conn.channel = 'my-queue-'+obj.agentID;
                            conn.q.bind(conn.exchange, conn.channel);

                            conn.q.subscribe(function (message) {
                                console.log("[MSG] ---> " + JSON.stringify(message));
                                conn.write(JSON.stringify(message) + "\n");
                            }).addCallback(function(ok) {
                                ctag[conn.channel] = ok.consumerTag; });
                        });
                    } catch (err) {
                        console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack);
                    }

                } else if (obj.header == "typing") {

                    var reply = {
                        type: 'chatMsg',
                        msg: utils.escp(obj.msga),
                        visitorNick: obj.channel,
                        customField1: '',
                        time: utils.getDateTime(),
                        channel: obj.channel
                    };

                    conn.exchange.publish('my-queue-'+obj.agentID, reply);
                }

            } catch (err) {
                console.log("ERROR ----> " + err.stack);
            }
        });

        // When the visitor closes or reloads a page we need to unbind from RabbitMQ?
        conn.on('close', function () {
            try {

                // Close the socket
                conn.close();

                // Close RabbitMQ           
               conn.q.unsubscribe(ctag[conn.channel]);

            } catch (er) {
                console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack);
            }
        });
    });
 

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X