RabbitMQ / AMQP:单个队列,多个消费者相同的消息?

一般来说,我只是开始使用RabbitMQ和AMQP。

  • 我有一个消息队列
  • 我有多个消费者,我想用同样的信息做不同的事情。

大部分的RabbitMQ文档似乎都集中在循环法(round-robin)上,也就是单个消费者消费单个消息的情况,负载分散在每个消费者之间。 这确实是我所见证的行为。

一个例子:生产者有一个单一的队列,每2秒发送一次消息:

var amqp = require('amqp'); var connection = amqp.createConnection({ host: "localhost", port: 5672 }); var count = 1; connection.on('ready', function () { var sendMessage = function(connection, queue_name, payload) { var encoded_payload = JSON.stringify(payload); connection.publish(queue_name, encoded_payload); } setInterval( function() { var test_message = 'TEST '+count sendMessage(connection, "my_queue_name", test_message) count += 1; }, 2000) }) 

这是一个消费者:

 var amqp = require('amqp'); var connection = amqp.createConnection({ host: "localhost", port: 5672 }); connection.on('ready', function () { connection.queue("my_queue_name", function(queue){ queue.bind('#'); queue.subscribe(function (message) { var encoded_payload = unescape(message.data) var payload = JSON.parse(encoded_payload) console.log('Recieved a message:') console.log(payload) }) }) }) 

如果我启动消费者两次, 我可以看到,每个消费者正在消耗轮stream行为的备用消息。 例如,我会在一个terminal看到消息1,3,5,在另一个terminal看到消息2,4,6

我的问题是:

  • 我可以让每个消费者收到相同的消息吗? 也就是说,两个消费者都得到消息1,2,3,4,5,6? 在AMQP / RabbitMQ中,这个叫什么? 它是如何configuration的?

  • 这是通常做的? 我应该把消息交换路线分成两个单独的队列,而不是一个消费者?

我可以让每个消费者收到相同的消息吗? 也就是说,两个消费者都得到消息1,2,3,4,5,6? 在AMQP / RabbitMQ中,这个叫什么? 它是如何configuration的?

不,如果消费者在同一队列中,则不行。 来自RabbitMQ的AMQP概念指南:

重要的是要明白,在AMQP 0-9-1中,消息之间的消息负载均衡。

这似乎意味着队列中的循环行为是给定的 ,而不是可configuration的。 也就是说,为了让多个消费者处理相同的消息ID,需要单独的队列。

这是通常做的? 我应该把消息交换路线分成两个单独的队列,而不是一个消费者?

不,它不是,单个队列/多个消费者,每个消费者处理相同的消息ID是不可能的。 把交换路由转换成两个独立的队列确实更好。

由于我不需要太复杂的路由, 扇出交换将很好地处理这个。 由于node-amqp具有允许您直接向连接发布消息的“默认交换”的概念,因此我之前不太关注Exchange,但是大多数AMQP消息都发布到特定交换机。

这是我的粉丝交换,发送和接收:

 var amqp = require('amqp'); var connection = amqp.createConnection({ host: "localhost", port: 5672 }); var count = 1; connection.on('ready', function () { connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) { var sendMessage = function(exchange, payload) { console.log('about to publish') var encoded_payload = JSON.stringify(payload); exchange.publish('', encoded_payload, {}) } // Recieve messages connection.queue("my_queue_name", function(queue){ console.log('Created queue') queue.bind(exchange, ''); queue.subscribe(function (message) { console.log('subscribed to queue') var encoded_payload = unescape(message.data) var payload = JSON.parse(encoded_payload) console.log('Recieved a message:') console.log(payload) }) }) setInterval( function() { var test_message = 'TEST '+count sendMessage(exchange, test_message) count += 1; }, 2000) }) }) 

只要阅读rabbitmq教程 。 你发布消息交换,而不是排队; 然后被路由到适当的队列。 在你的情况下,你应该为每个消费者绑定单独的队列。 这样,他们可以完全独立地使用消息。

发送模式是一对一的关系。 如果你想“发送”给多个接收者,你应该使用pub / sub模式。 有关更多详细信息,请参阅http://www.rabbitmq.com/tutorials/tutorial-three-python.html

最后几个答案几乎是正确的 – 我有大量的应用程序生成的消息,需要结束与不同的消费者,所以这个过程是非常简单的。

如果您希望多个消费者使用相同的消息,请执行以下步骤。

在每个队列属性中为每个要接收消息的应用程序创build多个队列,并使用amq.direct交换“绑定”一个路由标记。 更改发布应用程序以发送到amq.direct并使用路由标记(不是队列)。 然后AMQP将把消息复制到具有相同绑定的每个队列中。 作品魅力:)

例如:假设我有一个我生成的JSONstring,我使用路由标记“new-sales-order”将它发布到“amq.direct”交换中,我有一个排列我的order_printer应用程序的打印顺序,我有一个排队等候我的结算系统,将发送订单的副本和发票的客户端,我有一个networking归档系统,我归档订单历史/法规遵从性的原因,我有一个客户端的网页界面,跟踪其他信息的订单订单。

所以我的队列是:order_printer,order_billing,order_archive和order_tracking所有绑定标签“新销售订单”绑定到他们,所有4将获得JSON数据。

这是发送数据的理想方式,不需要发布应用程序知道或关心接收应用程序。

是的,每个消费者可以收到相同的消息。 看看http://www.rabbitmq.com/tutorials/tutorial-three-python.html http://www.rabbitmq.com/tutorials/tutorial-four-python.html http://www.rabbitmq。 COM /教程/教程五python.html

用于路由消息的不同方式。 我知道他们是为Python和Java,但它很好理解的原则,决定你在做什么,然后find如何在JS做到这一点。 它听起来像你想做一个简单的扇出( 教程3 ),它发送消息到连接到交换所有的队列。

你在做什么和你想做什么的区别基本上是你要build立和交换或打字扇出。 扇出报文将所有消息发送到所有连接的队列。 每个队列将有一个消费者可以分别访问所有的消息。

是的,这是通常做的,这是AMPQ的function之一。

RabbitMQ / AMQP:单个队列,多个消费者进行相同的消息和页面刷新。

 rabbit.on('ready', function () { }); sockjs_chat.on('connection', function (conn) { conn.on('data', function (message) { try { var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, '')); if (obj.header == "register") { // Connect to RabbitMQ try { conn.exchange = rabbit.exchange(exchange, { type: 'topic', autoDelete: false, durable: false, exclusive: false, confirm: true }); conn.q = rabbit.queue('my-queue-'+obj.agentID, { durable: false, autoDelete: false, exclusive: false }, function () { conn.channel = 'my-queue-'+obj.agentID; conn.q.bind(conn.exchange, conn.channel); conn.q.subscribe(function (message) { console.log("[MSG] ---> " + JSON.stringify(message)); conn.write(JSON.stringify(message) + "\n"); }).addCallback(function(ok) { ctag[conn.channel] = ok.consumerTag; }); }); } catch (err) { console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack); } } else if (obj.header == "typing") { var reply = { type: 'chatMsg', msg: utils.escp(obj.msga), visitorNick: obj.channel, customField1: '', time: utils.getDateTime(), channel: obj.channel }; conn.exchange.publish('my-queue-'+obj.agentID, reply); } } catch (err) { console.log("ERROR ----> " + err.stack); } }); // When the visitor closes or reloads a page we need to unbind from RabbitMQ? conn.on('close', function () { try { // Close the socket conn.close(); // Close RabbitMQ conn.q.unsubscribe(ctag[conn.channel]); } catch (er) { console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack); } }); }); 

要获得所需的行为,只需让每个消费者从自己的队列中消费。 您将不得不使用非直接交换types(主题,标题,扇出),以便一次将消息传递到所有队列。

正如我评估你的情况是:

  • 我有一个消息队列(接收消息的源代码,让我们把它命名为q111)

  • 我有多个消费者,我想用同样的信息做不同的事情。

这里的问题是,当这个队列接收到3条消息时,消息1被消费者A消费,其他消费者B和C消费消息2和3.当你需要一个设置时,rabbitmq通过相同的副本所有这三个消息(1,2,3)同时连接到所有三个连接的消费者(A,B,C)。

虽然许多configuration可以做到这一点,一个简单的方法是使用以下两个步骤的概念:

  • 使用dynamic的rabbitmq-shovel从所需的队列(q111)获取消息,并发布到扇出交换(专门为此目的创build和专用的交换)。
  • 现在,重新configuration您的消费者A,B和C(正在侦听队列(q111)),以便从每个消费者的直接匿名和匿名队列中直接侦听此Fanout交换机。

注意:在使用这个概念时,不要直接从源队列(q111)中消费,因为已经消耗的消息不会被推送到你的扇出交换中。

如果您认为这不符合您的确切要求…随时发布您的build议:-)

如果您碰巧正在使用amqplib库,那么他们有一个方便的示例 ,可以发现发布/订阅RabbitMQ教程 ,您可能会发现它很方便。

我想你应该检查使用扇出交换机发送消息。 这样你就会收到不同的消费者相同的消息,在表RabbitMQ正在创造这个新的消费者/用户的每一个differents队列。

这是在javascript https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html中查看教程示例的链接;