删除芹菜/ rabbitmq中的所有待处理任务

如何在不知道每个任务的task_id情况下删除所有挂起的任务?

从文档 :

 $ celery -A proj purge 

要么

 from proj.celery import app app.control.purge() 

(编辑:更新与当前的方法。)

对于芹菜3.0+:

 $ celery purge 

清除特定队列:

 $ celery -Q queue_name purge 

对于Celery 2.x和3.x:

例如,使用带-Q参数的工作程序来定义队列时

 celery worker -Q queue1,queue2,queue3 

那么celery purge将不起作用,因为你不能传递队列参数给它。 它只会删除默认队列。 解决的方法是用像这样的--purge参数来启动你的工作:

 celery worker -Q queue1,queue2,queue3 --purge 

然而,这将运行工人。

其他选项是使用芹菜的amqp子命令

 celery amqp queue.delete queue1 celery amqp queue.delete queue2 celery amqp queue.delete queue3 

我发现celery purge不适用于我更复杂的芹菜configuration。 我使用多个命名队列用于不同的目的:

 $ sudo rabbitmqctl list_queues -p celery name messages consumers Listing queues ... # Output sorted, whitespaced for readability celery 0 2 celery@web01.celery.pidbox 0 1 celery@web02.celery.pidbox 0 1 apns 0 1 apns@web01.celery.pidbox 0 1 analytics 1 1 analytics@web01.celery.pidbox 0 1 bcast.361093f1-de68-46c5-adff-d49ea8f164c0 0 1 bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1 0 1 celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54 0 1 celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866 0 1 celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99 0 1 celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e 0 1 

第一列是队列名称,第二列是在队列中等待的消息数量,第三列是该队列的侦听器数量。 队列是:

  • 芹菜 – 排队标准,幂等的芹菜任务
  • apns – 队列为Apple推送通知服务任务,不完全idempotent
  • 分析 – 排队长时间夜间分析
  • * .pidbox – 队列中的工作人员命令,如closures和重置,每个工作人员(2名芹菜工,1名工人,1名分析工作人员)
  • bcast。*广播队列,用于发送消息给所有听取队列的工作人员(而不是第一个抓取队列)
  • celeryev。* – 芹菜事件队列,用于报告任务分析

分析任务是在小数据集上运行良好的powershell任务,但现在需要超过24小时的时间才能处理。 有时候,一些东西会出错,它会在数据库中等待。 它需要重写,但在此之前,当它卡住我杀了任务,清空队列,然后再试一次。 我通过查看分析队列的消息计数(应该是0(完成分析)或1(等待昨晚的分析完成))来检测“卡住”。 2或更高版本是坏的,我收到一封电子邮件。

celery purge提供了从一个广播队列中删除任务,而我没有看到一个select一个不同的命名队列的选项。

这是我的过程:

 $ sudo /etc/init.d/celeryd stop # Wait for analytics task to be last one, Ctrl-C $ ps -ef | grep analytics # Get the PID of the worker, not the root PID reported by celery $ sudo kill <PID> $ sudo /etc/init.d/celeryd stop # Confim dead $ python manage.py celery amqp queue.purge analytics $ sudo rabbitmqctl list_queues -p celery name messages consumers # Confirm messages is 0 $ sudo /etc/init.d/celeryd start 

在芹菜3+:

CLI:

 $ celery -A proj purge 

编程方式:

 >>> from proj.celery import app >>> app.control.purge() 

http://docs.celeryproject.org/en/latest/faq.html#how-do-i-purge-all-waiting-tasks

在芹菜3+

http://docs.celeryproject.org/en/3.1/faq.html#how-do-i-purge-all-waiting-tasks

CLI

清除命名队列:

  celery -A proj amqp queue.purge <queue name> 

清除已configuration的队列

 celery -A proj purge 

我已经清除了邮件,但队列中还有留言吗? 答案:一旦实际执行任务,就立即确认(从队列中删除)任务。 工作人员收到任务后,需要一段时间才能真正执行,特别是如果有很多任务已经在等待执行。 未确认的消息由工作人员保留,直到closures与代理(AMQP服务器)的连接。 当连接closures时(例如,因为工作人员已经停止),这些任务将由代理重新发送给下一个可用的工作人员(或重启时的同一工作人员),因此要正确清除等待的任务队列必须停止所有的工作,然后使用celery.control.purge()清除任务。

因此,要清除整个队列,工作人员必须停止。

1.要正确清除等待任务队列,必须停止所有工作人员( http://celery.readthedocs.io/en/latest/faq.html#i-ve-purged-messages-but-there-are-仍然消息,在队列中; ):

 $ sudo rabbitmqctl stop 

或者(如果RabbitMQ /消息代理由Supervisorpipe理):

 $ sudo supervisorctl stop all 

2. …然后清除特定队列中的任务:

 $ cd <source_dir> $ celery amqp queue.purge <queue name> 

3.启动RabbitMQ:

 $ sudo rabbitmqctl start 

或者(如果RabbitMQ由主pipepipe理):

 $ sudo supervisorctl start all