ActiveMQ:如何在使用临时队列时处理代理故障转移

在我的JMS应用程序上,我们使用生产者上的临时队列来接收消费者应用程序的回复。

我在这个线程中提到了完全相同的问题: http : //activemq.2283324.n4.nabble.com/jira-Created-AMQ-3336-Temporary-Destination-errors-on-HA-failover-in -broker-networking与故障转移-TT-td3551034.html#a3612738

每当我在我的networking中重新启动一个任意的代理时,我在使用消息应用程序日志中收到许多像这样的错误,同时尝试将答复发送到临时队列:

javax.jms.InvalidDestinationException: Cannot publish to a deleted Destination: temp-queue://ID:... 

然后,我看到加里在那里build议使用的回应

 jms.watchTopicAdvisories=false 

作为客户端brokerURL上的url参数。 我立即用这个额外的参数改变了我的客户经纪人url。 但是,现在我看到像这样的错误,当我重新启动networking中的经纪人进行此故障转移testing:

 javax.jms.JMSException: The destination temp-queue: //ID:client.host-65070-1308610734958-2:1:1 does not exist. 

我正在使用ActiveMQ 5.5版本。 而我的客户代理url如下所示:

 failover:(tcp://amq-host1:61616,tcp://amq-host2.tred.aol.com:61616,tcp://amq-host3:61616,tcp://amq-host4:61616)?jms.useAsyncSend=true&timeout=5000&jms.watchTopicAdvisories=false 

另外这里是我的四个经纪人之一的activemqconfigurationXML: amq1.xml

这里有人可以看看这个问题,并build议我在这个设置中犯了什么错误。

更新:

进一步澄清我在我的代码中如何做请求 – 响应:

  1. 我已经使用每个生产者的目的地(即临时队列),并将其设置在每封邮件的回复标题中。
  2. 我已经在JMSCorrelationID标头中发送了每个消息的唯一关联标识符。
  3. 据我所知即使骆驼和Spring也在使用临时队列来请求响应机制。 唯一不同的是,Spring JMS实现为每个消息创build和销毁临时队列,而我为生产者的生命周期创build临时队列。 当客户端(生产者)应用程序closures时,此临时队列被销毁,或者当AMQ代理程序意识到没有活动的生产者与此临时队列连接时,此临时队列被销毁。
  4. 我已经在Producer端的每条消息上设置了一个消息过期,这样消息就不会在队列中被搁置太久(60秒)。

有一个代理属性,org.apache.activemq.broker.BrokerService#cacheTempDestinations,应该有助于故障转移:情况。 在xmlconfiguration中将其设置为true,并且在客户端断开连接时,不会立即删除临时目标。 快速故障切换:重新连接将能够再次生成和/或从临时队列消耗。

有一个基于timeBeforePurgeTempDestinations(缺省5秒)的计时器任务来处理caching删除。

一个警告,但我没有看到在使用该属性的ActiveMQ核心的任何testing,所以我不能给你任何保证。

临时队列在请求回复场景中的请求者(生产者)所连接的代理上创build。 它们是从javax.jms.Session创build的,因此在该会话断开连接时,无论是因为客户端断开还是代理失败/故障切换,这些队列都将永久消失。 其他经纪人都不会理解当你的一个消费者试图回答这些队列时的含义; 因此你的例外。

这需要在思想上进行架构转变,假设您要处理故障转移并保留所有消息。 以下是可以攻击问题的一般方法:

  1. 您的回复标题应该引用特定于请求者进程的queue:response.<client id>例如queue:response.<client id> 。 如果您的客户端数量有限,则客户端ID可能是标准名称,如果您拥有大量客户端,则可能是UUID。
  2. 出站消息应该设置一个关联标识符(简单来说,可以让你把请求和响应关联起来 – 请求者毕竟可以同时发出多个请求)。 这在JMSCorrelationID标头中设置,并且应该从请求复制到响应消息。
  3. 请求者需要在该队列上设置一个侦听器,该侦听器将根据该相关ID将消息主体返回给请求的线程。 有一些multithreading代码需要为此编写,因为您需要手动pipe理类似于相关ID的映射到原始线程(可能通过Futures)。

这与Apache Camel针对消息传递的请求响应所采取的方法类似。

有一点需要注意的是,当客户端的时候队列不会消失,所以你应该设置一个时间来处理响应消息,这样如果它没有被使用,它会从代理中被删除,否则你会得到积压未消费的消息。 您还需要设置一个死信队列策略来自动丢弃过期的消息 。