RabbitMQ的例子:多个线程,通道和队列

我刚刚阅读了RabbitMQ的Java API文档 ,发现它非常翔实和直接。 如何设置一个简单的发布/消费Channel的例子很容易理解。 但这是一个非常简单/基本的例子,它给我留下了一个重要的问题: 我如何设置1 +多个Channels来发布/消费多个队列?

比方说,我有一个RabbitMQ服务器上有3个队列: loggingsecurity_eventscustomer_orders 。 所以我们要么需要一个Channel才能够发布/使用所有3个队列,或者更有可能有3个独立的Channels ,每个Channels专用于一个队列。

最重要的是,RabbitMQ的最佳实践规定我们为每个消费者线程设置1个Channel 。 对于这个例子,假设security_events只有一个消费者线程,但是loggingcustomer_order都需要5个线程来处理这个卷。 所以,如果我理解正确,那是否意味着我们需要:

  • 1个Channel和1个消费者线程,用于发布/消费security_events ; 和
  • 5个Channels和5个消费者线程,用于发布/消耗logging ; 和
  • 5个Channels和5个消费者线索用于发布/消费customer_orders

如果我的理解被误导了,请先纠正我。 无论哪种方式,一些厌倦了RabbitMQ的老手能帮助我用一个体面的代码示例“连接点”来设置符合我的要求的出版商/消费者吗? 提前致谢!

我认为你最初的理解有几个问题。 坦率地说,我有点惊讶地看到以下内容: both need 5 threads to handle the volume 。 你怎么确定你需要这个确切的数字? 你有任何保证5线程就足够了?

RabbitMQ经过debugging和时间testing,所有关于正确的devise和高效的消息处理。

让我们试着回顾一下问题并find一个合适的解决scheme。 顺便说一句,消息队列本身不会提供任何保证,你真的很好的解决scheme。 你必须明白你在做什么,并做一些额外的testing。

你一定知道有很多布局可能:

在这里输入图像说明

我将用布局B作为说明生产者N消费者问题的最简单方法。 既然你很担心吞吐量 顺便说一句,正如你可能期望RabbitMQperformance相当好( 来源 )。 注意prefetchCount ,稍后我会解决它:

在这里输入图像说明

所以消息处理逻辑可能是确保你有足够吞吐量的一个合适的地方。 当然,每当你需要处理一条消息时,你可以跨越一个新的线程,但最终这样的方法会杀死你的系统。 基本上,更多的线程会让你得到更大的延迟(如果你愿意,你可以检查Amdahl的法则 )。

在这里输入图像说明

(参见Amdahl法则 )

提示#1:小心线程,使用ThreadPools( 细节 )

线程池可以描述为Runnable对象(工作队列)的集合和运行线程的连接。 这些线程不断运行,正在检查新工作的工作查询。 如果有新的工作要做,他们执行这个Runnable。 Thread类自身提供了一个方法,例如execute(Runnable r)将新的Runnable对象添加到工作队列中。

 public class Main { private static final int NTHREDS = 10; public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(NTHREDS); for (int i = 0; i < 500; i++) { Runnable worker = new MyRunnable(10000000L + i); executor.execute(worker); } // This will make the executor accept no new threads // and finish all existing threads in the queue executor.shutdown(); // Wait until all threads are finish executor.awaitTermination(); System.out.println("Finished all threads"); } } 

提示#2:小心消息处理开销

我会说这是明显的优化技术。 您可能会发送小而简单的消息。 整个方法是关于更小的消息被连续设置和处理。 大消息最终会玩坏玩笑,所以最好避免这种情况。

在这里输入图像说明

所以最好发送一小段信息,但是处理呢? 每次提交工作都会有开销。 批量处理对于高传入消息速率可能非常有用。

在这里输入图像说明

例如,假设我们有简单的消息处理逻辑,并且每次处理消息时我们不希望线程特定的开销。 为了优化, CompositeRunnable can be introduced非常简单的CompositeRunnable can be introduced

 class CompositeRunnable implements Runnable { protected Queue<Runnable> queue = new LinkedList<>(); public void add(Runnable a) { queue.add(a); } @Override public void run() { for(Runnable r: queue) { r.run(); } } } 

或者采取稍微不同的方式,通过收集要处理的邮件:

 class CompositeMessageWorker<T> implements Runnable { protected Queue<T> queue = new LinkedList<>(); public void add(T message) { queue.add(message); } @Override public void run() { for(T message: queue) { // process a message } } } 

以这种方式,您可以更有效地处理消息。

提示#3:优化消息处理

尽pipe你知道可以并行处理消息( Tip #1 ),并减less处理开销( Tip #2 ),你必须做的一切都很快。 冗余的处理步骤,繁重的循环等可能会影响性能。 请看有趣的案例研究:

在这里输入图像说明

通过select正确的XMLparsing器,将消息队列吞吐量提高十倍

提示#4:连接和通道pipe理

  • 在现有的连接上启动一个新的通道需要一次networking往返 – 启动一个新的连接需要几次。
  • 每个连接都使用服务器上的文件描述符。 渠道不。
  • 在一个频道上发布大量的信息将会阻止连接。 除此之外,复用是相当透明的。
  • 如果服务器过载,正在发布的连接可能会被阻塞 – 分离发布和使用连接是一个好主意
  • 准备处理消息突发

( 来源 )

请注意,所有提示完全一起工作。 随时让我知道,如果你需要更多的细节。

完整的消费者例子( 来源 )

请注意以下事项:

  • channel.basicQos(prefetch) – 正如你之前看到的prefetchCount可能非常有用:

    该命令允许用户select一个预取窗口,指定准备接收的未确认消息的数量。 通过将预取计数设置为非零值,代理将不会向消费者传递任何违反该限制的消息。 要向前移动窗口,消费者必须确认收到消息(或一组消息)。

  • ExecutorService threadExecutor – 您可以指定正确configuration的执行程序服务。

例:

 static class Worker extends DefaultConsumer { String name; Channel channel; String queue; int processed; ExecutorService executorService; public Worker(int prefetch, ExecutorService threadExecutor, , Channel c, String q) throws Exception { super(c); channel = c; queue = q; channel.basicQos(prefetch); channel.basicConsume(queue, false, this); executorService = threadExecutor; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { Runnable task = new VariableLengthTask(this, envelope.getDeliveryTag(), channel); executorService.submit(task); } } 

您也可以检查以下内容:

  • 解决scheme使用队列devise?
  • 一些排队理论:吞吐量,延迟和带宽
  • 快速消息队列基准:ActiveMQ,RabbitMQ,HornetQ,QPID,Apollo …

我如何设置1 +频道发布/消费多个队列?

您可以使用线程和通道来实现。 所有你需要的是对事物进行分类的一种方式,即login中的所有队列项目,来自security_events的所有队列元素等等。使用routingKey可以实现分类。

即:每当你添加一个项目到队列中时,指定路由密钥。 它将作为一个属性元素添加。 通过这个,你可以从一个特定的事件得到值, logging

下面的代码示例解释了如何在客户端完成它。

例如:

使用路由键标识通道的types并回顾types。

例如,如果您需要获取关于logintypes的所有通道,则必须将路由密钥指定为login名或其他关键字以标识该密码。

  Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); string routingKey="login"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); 

你可以在这里find关于分类的更多细节。


线程部分

一旦发布部分结束,您可以运行线程部分..

在这一部分,您可以根据类别获取已发布的数据。 即; 路由密钥,在你的情况是logging,security_events和customer_orders等

请查看示例以了解如何在线程中检索数据。

例如:

  ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //**The threads part is as follows** channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); // This part will biend the queue with the severity (login for eg:) for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, routingKey); } boolean autoAck = false; channel.basicConsume(queueName, autoAck, "myConsumerTag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.contentType; long deliveryTag = envelope.getDeliveryTag(); // (process the message components here ...) channel.basicAck(deliveryTag, false); } }); 

现在创build一个处理typeslogin(路由键)的队列中的数据的线程。 通过这种方式你可以创build多个线程。 每个服务不同的目的。

看看这里的线程部分的更多细节..

为什么要自己实施一切?

尝试使用某种集成框架。 可以说, 骆驼已经有一堆连接到各种系统,兔子MQ包括骆驼兔MQ 。

你必须定义你的路线。 例如:

您希望将消息从5个并发使用者的logging队列中消费到一个文件中。

 from("rabbitmq://localhost/Logging ?concurrentConsumers=5") .to("file://yourLoggingFile") 

如何设置文件使用者有很多选项 。 正如你所看到的,你可以通过在你的URI中joinconcurrentConsumers=5来定义应该产生多less个消费者。 如果你想通过实现处理器接口,你可以创build自己的生产者或消费者。

这是非常灵活和function强大的框架,只需使用提供的组件即可完成大量工作。 该项目网站包含一堆的例子和文件。