如何在RabbitMQ中创build延迟队列?
使用Python,Pika和RabbitMQ创build延迟(或停放)队列的最简单方法是什么? 我已经看到类似的问题 ,但没有看到Python。
在devise应用程序时,我发现这是一个有用的想法,因为它允许我们限制需要重新排队的消息。
总是有可能收到比你能处理更多的消息,可能是HTTP服务器速度太慢,或者数据库处于太大的压力之下。
在发生丢失消息的容忍度为零的情况下出现问题时,我也发现它非常有用,而重新排队无法处理的消息可以解决这个问题。 它也可能会导致问题的地方,一遍又一遍的邮件。 可能导致性能问题,并logging垃圾邮件。
我发现这在开发我的应用程序时非常有用。 因为它给你一个简单的重新排队你的消息的select。 这可以轻松地降低代码的复杂性,是RabbitMQ中许多强大的隐藏function之一。
脚步
首先我们需要build立两个基本通道,一个用于主队列,另一个用于延迟队列。 在我最后的例子中,我包含了一些不需要的附加标志,但是使代码更加可靠。 如confirm delivery
, delivery_mode
和durable
。 您可以在RabbitMQ 手册中find更多关于这些信息。
在我们build立了通道之后,我们添加一个绑定到主通道,我们可以使用它从延迟通道发送消息到主队列。
channel.queue_bind(exchange='amq.direct', queue='hello')
接下来,我们需要configuration我们的延迟通道,以在消息到期后将消息转发到主队列。
delay_channel.queue_declare(queue='hello_delay', durable=True, arguments={ 'x-message-ttl' : 5000, 'x-dead-letter-exchange' : 'amq.direct', 'x-dead-letter-routing-key' : 'hello' })
-
x-message-ttl (消息 – 生存时间)
这通常用于在特定持续时间后自动删除队列中的旧消息,但是通过添加两个可选参数,我们可以更改此行为,而不是以毫秒为单位确定消息停留在延迟队列中的时间。
-
X-死信路由键
这个variables允许我们在消息到期后将消息转移到不同的队列中,而不是完全删除它的默认行为。
-
X-死信交换
此variables确定哪个Exchange用于将消息从hello_delay传输到hello队列。
发布到延迟队列
当我们完成所有基本的Pika参数设置时,只需使用基本发布向延迟队列发送消息。
delay_channel.basic_publish(exchange='', routing_key='hello_delay', body="test", properties=pika.BasicProperties(delivery_mode=2))
一旦你已经执行了脚本,你应该会看到在你的RabbitMQpipe理模块中创build的以下队列。
例。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) # Create normal 'Hello World' type channel. channel = connection.channel() channel.confirm_delivery() channel.queue_declare(queue='hello', durable=True) # We need to bind this channel to an exchange, that will be used to transfer # messages from our delay queue. channel.queue_bind(exchange='amq.direct', queue='hello') # Create our delay channel. delay_channel = connection.channel() delay_channel.confirm_delivery() # This is where we declare the delay, and routing for our delay channel. delay_channel.queue_declare(queue='hello_delay', durable=True, arguments={ 'x-message-ttl' : 5000, # Delay until the message is transferred in milliseconds. 'x-dead-letter-exchange' : 'amq.direct', # Exchange used to transfer the message from A to B. 'x-dead-letter-routing-key' : 'hello' # Name of the queue we want the message transferred to. }) delay_channel.basic_publish(exchange='', routing_key='hello_delay', body="test", properties=pika.BasicProperties(delivery_mode=2)) print " [x] Sent"
你可以使用RabbitMQ官方插件: x-delayed-message 。
首先,将ez文件下载并复制到Your_rabbitmq_root_path / plugins中
其次,启用插件(不需要重启服务器):
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
最后,用“x-delay”标题发布消息,如:
headers.put("x-delay", 5000);
注意:
它不能确保你的邮件的安全,如果你的邮件只是在你的rabbitmq-server的停机时间过期,不幸的是邮件丢失。 所以在使用这个scheme时要小心 。
在rabbitmq-delayed-message-exchange中享受它和更多信息
仅供参考,如何在Spring 3.2.x中做到这一点。
<rabbit:queue name="delayQueue" durable="true" queue-arguments="delayQueueArguments"/> <rabbit:queue-arguments id="delayQueueArguments"> <entry key="x-message-ttl"> <value type="java.lang.Long">10000</value> </entry> <entry key="x-dead-letter-exchange" value="finalDestinationTopic"/> <entry key="x-dead-letter-routing-key" value="finalDestinationQueue"/> </rabbit:queue-arguments> <rabbit:fanout-exchange name="finalDestinationTopic"> <rabbit:bindings> <rabbit:binding queue="finalDestinationQueue"/> </rabbit:bindings> </rabbit:fanout-exchange>
NodeJS的实现。
代码中的一切都很清楚。 希望能节省一些时间。
var ch = channel; ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false}); ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false}); // setup intermediate queue which will never be listened. // all messages are TTLed so when they are "dead", they come to another exchange ch.assertQueue("my_intermediate_queue", { deadLetterExchange: "my_final_delayed_exchange", messageTtl: 5000, // 5sec }, function (err, q) { ch.bindQueue(q.queue, "my_intermediate_exchange", ''); }); ch.assertQueue("my_final_delayed_queue", {}, function (err, q) { ch.bindQueue(q.queue, "my_final_delayed_exchange", ''); ch.consume(q.queue, function (msg) { console.log("delayed - [x] %s", msg.content.toString()); }, {noAck: true}); });