找出是否存在芹菜任务

是否有可能找出具有特定任务ID的任务是否存在? 当我试图获得地位时,我会一直等待。

>>> AsyncResult('...').status 'PENDING' 

我想知道一个给定的任务ID是否是一个真正的芹菜任务ID,而不是一个随机的string。 我想要不同的结果取决于是否有一个有效的任务为一个特定的ID。

在过去可能有相同的ID有效的任务,但结果可能已经从后端删除。

在发送任务时,Celery不写入状态,这部分是优化(参见http://docs.celeryproject.org/en/latest/userguide/tasks.html#state )。

如果你真的需要它,添加它很简单:

 from celery import current_app # `after_task_publish` is available in celery 3.1+ # for older versions use the deprecated `task_sent` signal from celery.signals import after_task_publish @after_task_publish.connect def update_sent_state(sender=None, body=None, **kwargs): # the task may not exist if sent using `send_task` which # sends tasks by name, so fall back to the default result backend # if that is the case. task = current_app.tasks.get(sender) backend = task.backend if task else current_app.backend backend.store_result(body['id'], None, "SENT") 

然后你可以testingPENDING状态来检测一个任务没有(似乎)发送:

 >>> result.state != "PENDING" 

AsyncResult.state在未知的任务ID时返回PENDING。

PENDING

任务正在等待执行或未知。 任何未知的任务ID都暗示为未决状态。

http://docs.celeryproject.org/en/latest/userguide/tasks.html#pending

如果您需要区分未知的ID和现有的ID,您可以提供自定义的任务ID:

 >>> from tasks import add >>> from celery.utils import uuid >>> r = add.apply_async(args=[1, 2], task_id="celery-task-id-"+uuid()) >>> id = r.task_id >>> id 'celery-task-id-b774c3f9-5280-4ebe-a770-14a6977090cd' >>> if not "blubb".startswith("celery-task-id-"): print "Unknown task id" ... Unknown task id >>> if not id.startswith("celery-task-id-"): print "Unknown task id" ... 

现在我正在使用以下scheme:

  1. 获取任务ID。
  2. 设置为memcache键,如“task_%s”%task.id消息“Started”。
  3. 将任务ID传递给客户端。
  4. 现在从客户端,我可以监视任务状态(从任务消息设置为memcache)。
  5. 从准备好的任务到memcache密钥消息“就绪”。
  6. 从客户端准备好任务 – 启动将从内存caching中删除密钥的特殊任务,并执行必要的清理操作。

您需要在您创build的AsyncTask对象上调用.get()以真正从后端获取结果。

请参阅Celery FAQ 。


为了进一步澄清我的答案。

任何string在技术上都是有效的ID,无法validation任务ID。 找出任务是否存在的唯一方法是询问后端是否知道该任务,并且必须使用.get()

这引入了这样一个问题: .get()阻止后端没有任何有关您提供的任务ID的信息,这是devise允许您启动一个任务,然后等待其完成。

在原始问题的情况下,我将假设OP想要获得先前完成的任务的状态。 要做到这一点,你可以通过一个非常小的超时和捕获超时错误:

 from celery.exceptions import TimeoutError try: # fetch the result from the backend # your backend must be fast enough to return # results within 100ms (0.1 seconds) result = AsyncResult('blubb').get(timeout=0.1) except TimeoutError: result = None if result: print "Result exists; state=%s" % (result.state,) else: print "Result does not exist" 

如果没有办法知道任务ID是否有效,因为没有任何logging,这应该不用说,这只适用于后端存储结果。


甚至更多的澄清。

你想要做的事情不能使用AMQP后端来完成,因为它不存储结果,它转发它们 。

我的build议是切换到数据库后端,以便结果在一个数据库中,您可以在现有的芹菜模块之外查询。 如果结果数据库中不存在任何任务,则可以假定该ID无效。

尝试

 AsyncResult('blubb').state 

这可能工作。

它应该返回不同的东西。

如果我错了,请纠正我。

 if built_in_status_check(task_id) == 'pending' if registry_exists(task_id) == true print 'Pending' else print 'Task does not exist'