如何检查芹菜的任务状态?

如何检查一个任务是否在芹菜(特别是,我使用芹菜Django)运行?

我已经阅读了文档,并且使用了Googlesearch,但是我看不到如下所示的调用:

my_example_task.state() == RUNNING 

我的用例是我有一个外部(Java)服务转码。 当我发送要转码的文档时,我想检查运行该服务的任务是否正在运行,如果没有,则(重新)启动它。

我使用目前的稳定版本 – 2.4,我相信。

返回task_id(从.delay()给出),然后向芹菜实例询问状态:

 x = method.delay(1,2) print x.task_id 

当问,使用这个task_id得到一个新的AsyncResult:

 from celery.result import AsyncResult res = AsyncResult("your-task-id") res.ready() 

每个Task对象都有一个.request属性,它包含了它的AsyncRequest对象。 因此,下面的行给出了一个Task task的状态:

 task.AsyncResult(task.request.id).state 

从任务ID创buildAsyncResult对象 常见问题解答中build议的方式,当您唯一拥有的是任务ID时,可以获取任务状态。

但是,从Celery 3.x开始,有一些重要的警告,如果他们不注意,可能会咬人。 这实际上取决于具体的用例场景。

默认情况下,Celery不logging“正在运行”状态。

为了让Celerylogging任务正在运行,您必须将CELERY_TRACK_STARTED设置为True 。 这是一个简单的任务,testing这个:

 @app.task(bind=True) def test(self): print self.AsyncResult(self.request.id).state 

CELERY_TRACK_STARTEDFalse ,默认情况下,即使任务已经启动,状态显示CELERY_TRACK_STARTED PENDING 。 如果将CELERY_TRACK_STARTED设置为True ,则状态将被STARTED

状态PENDING意思是“我不知道”。

状态为PENDINGAsyncResult并不意味着Celery不知道任务的状态。 这可能是由于许多原因。

首先, AsyncResult可以用无效的任务ID构build。 芹菜认为这样的“任务”

 >>> task.AsyncResult("invalid").status 'PENDING' 

好的,所以没有人AsyncResult明显无效的id。 公平的,但也有效果, AsyncResult也将考虑一个已经成功运行,但是芹菜已经被遗忘为PENDING 再次, 在一些使用情况下,这可能是一个问题。 部分问题取决于Celery如何configuration以保持任务的结果,因为它取决于结果后端中“墓碑”的可用性。 (“Tombstones”是Celery文档中用于logging任务结束的数据块的术语。)如果CELERY_IGNORE_RESULTTrue则使用AsyncResult根本不起作用。 一个更棘手的问题是,芹菜默认到期墓碑。 CELEREY_TASK_RESULT_EXPIRES设置默认设置为24小时。 所以,如果你启动一个任务,并将其logging在长期存储中,并且更多的24小时后,你创build一个AsyncResult ,状态将是PENDING

所有“真正的任务”都是以PENDING状态开始的。 所以得到一个任务PENDING可能意味着这个任务被请求,但从来没有比这个更进步(无论什么原因)。 或者这可能意味着任务运行,但是芹菜忘记了它的状态。

哎哟! AsyncResult不适用于我。 我还可以做些什么?

我更喜欢跟踪目标,而不是跟踪任务本身 。 我确实保留了一些任务信息,但是跟踪目标确实是次要的。 目标存储在独立于芹菜的存储中。 当请求需要执行计算取决于某个目标已经实现时,它检查目标是否已经达到,如果是,则使用该caching的目标,否则启动将影响目标的任务,并发送到发出HTTP请求的客户端响应,指示它应等待结果。

您也可以创build自定义状态并更新其值的任务执行。 这个例子来自文档:

 @app.task(bind=True) def upload_files(self, filenames): for i, file in enumerate(filenames): if not self.request.called_directly: self.update_state(state='PROGRESS', meta={'current': i, 'total': len(filenames)}) 

http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states

老问题,但我最近遇到这个问题。

如果你想获得task_id,你可以这样做:

 import celery from celery_app import add from celery import uuid task_id = uuid() result = add.apply_async((2, 2), task_id=task_id) 

现在你确切知道task_id是什么,现在可以使用它来获得AsyncResult:

 # grab the AsyncResult result = celery.result.AsyncResult(task_id) # print the task id print result.task_id 09dad9cf-c9fa-4aee-933f-ff54dae39bdf # print the AsyncResult's status print result.status SUCCESS # print the result returned print result.result 4 

尝试:

task.AsyncResult(task.request.id).state

这将提供芹菜任务状态。 如果芹菜任务已经处于失败状态,它将抛出一个例外:

raised unexpected: KeyError('exc_type',)

对于简单的任务,我们可以使用http://flower.readthedocs.io/en/latest/screenshots.html和http://policystat.github.io/jobtastic/来进行监控。;

对于复杂的任务,说一个处理很多其他模块的任务。 我们build议手动在特定任务单元上logging进度和消息。

我find了有用的信息

芹菜项目工人指导检查人员

对我来说,我正在检查Celery是否在运行。

 inspect_workers = task.app.control.inspect() if inspect_workers.registered() is None: state = 'FAILURE' else: state = str(task.state) 

你可以玩视察来获得你的需求。