用芹菜运行“独特”的任务

我使用芹菜更新我的新闻汇总站点的RSS订阅源。 我为每个feed使用一个@task,而且事情似乎很好。

有一个细节我不确定如何处理:所有的提要每分钟更新一次@periodic_task,但是如果一个提要仍然从最后一个定期任务更新,当一个新的启动? (例如,如果Feed非常慢或脱机,并且任务被保留在重试循环中)

目前我存储任务结果并检查他们的状态,如下所示:

import socket from datetime import timedelta from celery.decorators import task, periodic_task from aggregator.models import Feed _results = {} @periodic_task(run_every=timedelta(minutes=1)) def fetch_articles(): for feed in Feed.objects.all(): if feed.pk in _results: if not _results[feed.pk].ready(): # The task is not finished yet continue _results[feed.pk] = update_feed.delay(feed) @task() def update_feed(feed): try: feed.fetch_articles() except socket.error, exc: update_feed.retry(args=[feed], exc=exc) 

也许有一个更复杂/更强大的方式来实现相同的结果,使用一些我错过的芹菜机制?

从官方文档: 确保一个任务只执行一次

根据MattH的回答,你可以使用像这样的装饰器:

 def single_instance_task(timeout): def task_exc(func): @functools.wraps(func) def wrapper(*args, **kwargs): lock_id = "celery-single-instance-" + func.__name__ acquire_lock = lambda: cache.add(lock_id, "true", timeout) release_lock = lambda: cache.delete(lock_id) if acquire_lock(): try: func(*args, **kwargs) finally: release_lock() return wrapper return task_exc 

那么,像这样使用它…

 @periodic_task(run_every=timedelta(minutes=1)) @single_instance_task(60*10) def fetch_articles() yada yada... 

如果你正在寻找一个不使用Django的例子,那么试试这个例子 (注意:使用Redis代替我已经使用过)。

装饰者的代码如下(完全赞成文章的作者,阅读它)

 import redis REDIS_CLIENT = redis.Redis() def only_one(function=None, key="", timeout=None): """Enforce only one celery task at a time.""" def _dec(run_func): """Decorator.""" def _caller(*args, **kwargs): """Caller.""" ret_value = None have_lock = False lock = REDIS_CLIENT.lock(key, timeout=timeout) try: have_lock = lock.acquire(blocking=False) if have_lock: ret_value = run_func(*args, **kwargs) finally: if have_lock: lock.release() return ret_value return _caller return _dec(function) if function is not None else _dec 

使用https://pypi.python.org/pypi/celery_once似乎做的工作真的很好,包括报告错误和testing一些参数的唯一性。;

你可以做这样的事情:

 from celery_once import QueueOnce from myapp.celery import app from time import sleep @app.task(base=QueueOnce, once=dict(keys=('customer_id',))) def start_billing(customer_id, year, month): sleep(30) return "Done!" 

在你的项目中只需要下面的设置:

 ONCE_REDIS_URL = 'redis://localhost:6379/0' ONCE_DEFAULT_TIMEOUT = 60 * 60 # remove lock after 1 hour in case it was stale 

这个解决scheme适用于在单个主机上工作的celery,协议大于1.其他types(没有依赖关系像redis)locking差异基于文件的不兼容性更大1。

 class Lock(object): def __init__(self, filename): self.f = open(filename, 'w') def __enter__(self): try: flock(self.f.fileno(), LOCK_EX | LOCK_NB) return True except IOError: pass return False def __exit__(self, *args): self.f.close() class SinglePeriodicTask(PeriodicTask): abstract = True run_every = timedelta(seconds=1) def __call__(self, *args, **kwargs): lock_filename = join('/tmp', md5(self.name).hexdigest()) with Lock(lock_filename) as is_locked: if is_locked: super(SinglePeriodicTask, self).__call__(*args, **kwargs) else: print 'already working' class SearchTask(SinglePeriodicTask): restart_delay = timedelta(seconds=60) def run(self, *args, **kwargs): print self.name, 'start', datetime.now() sleep(5) print self.name, 'end', datetime.now()