如何检查芹菜的任务状态?
如何检查一个任务是否在芹菜(特别是,我使用芹菜Django)运行?
我已经阅读了文档,并且使用了Googlesearch,但是我看不到如下所示的调用:
my_example_task.state() == RUNNING
我的用例是我有一个外部(Java)服务转码。 当我发送要转码的文档时,我想检查运行该服务的任务是否正在运行,如果没有,则(重新)启动它。
我使用目前的稳定版本 – 2.4,我相信。
返回task_id(从.delay()给出),然后向芹菜实例询问状态:
x = method.delay(1,2) print x.task_id
当问,使用这个task_id得到一个新的AsyncResult:
from celery.result import AsyncResult res = AsyncResult("your-task-id") res.ready()
每个Task
对象都有一个.request
属性,它包含了它的AsyncRequest
对象。 因此,下面的行给出了一个Task task
的状态:
task.AsyncResult(task.request.id).state
从任务ID创buildAsyncResult
对象是 常见问题解答中build议的方式,当您唯一拥有的是任务ID时,可以获取任务状态。
但是,从Celery 3.x开始,有一些重要的警告,如果他们不注意,可能会咬人。 这实际上取决于具体的用例场景。
默认情况下,Celery不logging“正在运行”状态。
为了让Celerylogging任务正在运行,您必须将CELERY_TRACK_STARTED
设置为True
。 这是一个简单的任务,testing这个:
@app.task(bind=True) def test(self): print self.AsyncResult(self.request.id).state
当CELERY_TRACK_STARTED
为False
,默认情况下,即使任务已经启动,状态显示CELERY_TRACK_STARTED
PENDING
。 如果将CELERY_TRACK_STARTED
设置为True
,则状态将被STARTED
。
状态PENDING
意思是“我不知道”。
状态为PENDING
的AsyncResult
并不意味着Celery不知道任务的状态。 这可能是由于许多原因。
首先, AsyncResult
可以用无效的任务ID构build。 芹菜认为这样的“任务”
>>> task.AsyncResult("invalid").status 'PENDING'
好的,所以没有人AsyncResult
喂明显无效的id。 公平的,但也有效果, AsyncResult
也将考虑一个已经成功运行,但是芹菜已经被遗忘为PENDING
。 再次, 在一些使用情况下,这可能是一个问题。 部分问题取决于Celery如何configuration以保持任务的结果,因为它取决于结果后端中“墓碑”的可用性。 (“Tombstones”是Celery文档中用于logging任务结束的数据块的术语。)如果CELERY_IGNORE_RESULT
为True
则使用AsyncResult
根本不起作用。 一个更棘手的问题是,芹菜默认到期墓碑。 CELEREY_TASK_RESULT_EXPIRES
设置默认设置为24小时。 所以,如果你启动一个任务,并将其logging在长期存储中,并且更多的24小时后,你创build一个AsyncResult
,状态将是PENDING
。
所有“真正的任务”都是以PENDING
状态开始的。 所以得到一个任务PENDING
可能意味着这个任务被请求,但从来没有比这个更进步(无论什么原因)。 或者这可能意味着任务运行,但是芹菜忘记了它的状态。
哎哟! AsyncResult
不适用于我。 我还可以做些什么?
我更喜欢跟踪目标,而不是跟踪任务本身 。 我确实保留了一些任务信息,但是跟踪目标确实是次要的。 目标存储在独立于芹菜的存储中。 当请求需要执行计算取决于某个目标已经实现时,它检查目标是否已经达到,如果是,则使用该caching的目标,否则启动将影响目标的任务,并发送到发出HTTP请求的客户端响应,指示它应等待结果。
您也可以创build自定义状态并更新其值的任务执行。 这个例子来自文档:
@app.task(bind=True) def upload_files(self, filenames): for i, file in enumerate(filenames): if not self.request.called_directly: self.update_state(state='PROGRESS', meta={'current': i, 'total': len(filenames)})
http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states
老问题,但我最近遇到这个问题。
如果你想获得task_id,你可以这样做:
import celery from celery_app import add from celery import uuid task_id = uuid() result = add.apply_async((2, 2), task_id=task_id)
现在你确切知道task_id是什么,现在可以使用它来获得AsyncResult:
# grab the AsyncResult result = celery.result.AsyncResult(task_id) # print the task id print result.task_id 09dad9cf-c9fa-4aee-933f-ff54dae39bdf # print the AsyncResult's status print result.status SUCCESS # print the result returned print result.result 4
尝试:
task.AsyncResult(task.request.id).state
这将提供芹菜任务状态。 如果芹菜任务已经处于失败状态,它将抛出一个例外:
raised unexpected: KeyError('exc_type',)
对于简单的任务,我们可以使用http://flower.readthedocs.io/en/latest/screenshots.html和http://policystat.github.io/jobtastic/来进行监控。;
对于复杂的任务,说一个处理很多其他模块的任务。 我们build议手动在特定任务单元上logging进度和消息。
我find了有用的信息
芹菜项目工人指导检查人员
对我来说,我正在检查Celery是否在运行。
inspect_workers = task.app.control.inspect() if inspect_workers.registered() is None: state = 'FAILURE' else: state = str(task.state)
你可以玩视察来获得你的需求。