RabbitMQ:与话题交换的持久消息

我对RabbitMQ很新。

我已经build立了一个“话题”交stream。 消费者可以在出版商之后开始。 我希望消费者能够接收已经发送的消息,而这些消息尚未被消费。

交易所设置了以下参数:

exchange_type => 'topic' durable => 1 auto_delete => 0 passive => 0 

消息使用此参数发布:

 delivery_mode => 2 

消费者使用get()从交换中检索消息。

不幸的是,任何客户端之前发布的消息都将丢失。 我使用了不同的组合。

我想我的问题是,交stream不举行消息。 也许我需要在发布者和队列之间有一个队列。 但是这似乎并不适用于通过密钥路由消息的“主题”交换。

任何想法我应该如何继续。 我使用Perl绑定Net :: RabbitMQ(不应该)和RabbitMQ 2.2.0。

如果没有可用的连接使用者在发布时处理消息,则需要一个持久队列来存储消息。

交换不存储消息,但是队列可以。 令人困惑的是,交易所可以被标记为“持久”,但所有真正意义上的交换本身仍然会在那里,如果你重新启动你的经纪人,但并不意味着发送到该交易所的任何消息都会自动持续下去。

鉴于此,这里有两个select:

  1. 在启动发布者之前执行pipe理步骤以自行创build队列。 您可以使用Web UI或命令行工具来执行此操作。 确保将其创build为一个持久队列,以便即使没有活动的使用者,它也会存储路由到它的所有消息。
  2. 假设您的消费者被编码为始终在启动时声明(并因此自动创build)他们的交换和队列(并且声明它们是持久的), 那么在启动任何发布者之前,至less运行一次所有消费者 。 这将确保您的所有队列被正确创build。 然后,您可以closures消费者,直到真正需要消费者,因为队列将持续存储路由到他们的未来消息。

我会去#1。 可能没有太多的步骤来执行,你总是可以脚本所需的步骤,以便他们可以重复。 此外,如果所有的消费者都将从同一个队列中抽取(而不是每个队列都有一个专用的队列),这确实是一个最小的pipe理开销。

队列是要妥善pipe理和控制的东西。 否则,你可能会最终与stream氓消费者声明持久队列,使用他们几分钟,但永远不会。 不久之后,你将会有一个永无止境的队伍,没有什么东西在缩小,还有即将到来的经纪人启示录。

正如Brian所提到的,交换不存储消息,主要负责将消息路由到其他交换机或队列。 如果交换没有被绑定到一个队列,那么所有发送到该交换的消息将被“丢失”

您不需要在发布者脚本中声明固定的客户端队列,因为这可能无法扩展。 队列可以由发布者dynamic创build,并使用交换 – 交换绑定进行内部路由。

RabbitMQ支持交换到交换绑定,这将允许拓扑的灵活性,解耦和其他好处。 你可以在这里阅读更多在RabbitMQ交换绑定[AMPQ]

RabbitMQ Exchange交换绑定

示例拓扑

Python代码示例,如果没有消费者使用队列,则使用持久性创build交换与交换绑定。

 #!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #Declares the entry exchange to be used by all producers to send messages. Could be external producers as well channel.exchange_declare(exchange='data_gateway', exchange_type='fanout', durable=True, auto_delete=False) #Declares the processing exchange to be used.Routes messages to various queues. For internal use only channel.exchange_declare(exchange='data_distributor', exchange_type='topic', durable=True, auto_delete=False) #Binds the external/producer facing exchange to the internal exchange channel.exchange_bind(destination='data_distributor',source='data_gateway') ##Create Durable Queues binded to the data_distributor exchange channel.queue_declare(queue='trade_db',durable=True) channel.queue_declare(queue='trade_stream_service',durable=True) channel.queue_declare(queue='ticker_db',durable=True) channel.queue_declare(queue='ticker_stream_service',durable=True) channel.queue_declare(queue='orderbook_db',durable=True) channel.queue_declare(queue='orderbook_stream_service',durable=True) #Bind queues to exchanges and correct routing key. Allows for messages to be saved when no consumer is present channel.queue_bind(queue='orderbook_db',exchange='data_distributor',routing_key='*.*.orderbook') channel.queue_bind(queue='orderbook_stream_service',exchange='data_distributor',routing_key='*.*.orderbook') channel.queue_bind(queue='ticker_db',exchange='data_distributor',routing_key='*.*.ticker') channel.queue_bind(queue='ticker_stream_service',exchange='data_distributor',routing_key='*.*.ticker') channel.queue_bind(queue='trade_db',exchange='data_distributor',routing_key='*.*.trade') channel.queue_bind(queue='trade_stream_service',exchange='data_distributor',routing_key='*.*.trade')