在Celery中检索队列中的任务列表

我如何检索队列中尚未处理的任务列表?

编辑:请参阅其他答案获取队列中的任务列表。

你应该看看这里: 芹菜指南 – 检查工人

基本上这个:

>>> from celery.task.control import inspect # Inspect all nodes. >>> i = inspect() # Show the items that have an ETA or are scheduled for later processing >>> i.scheduled() # Show tasks that are currently active. >>> i.active() # Show tasks that have been claimed by workers >>> i.reserved() 

取决于你想要什么

如果您使用的是rabbitMQ,请在terminal中使用:

 sudo rabbitmqctl list_queues 

它将打印具有待处理任务数量的队列列表。 例如:

 Listing queues ... 0b27d8c59fba4974893ec22d478a7093 0 0e0a2da9828a48bc86fe993b210d984f 0 10@torob2.celery.pidbox 0 11926b79e30a4f0a9d95df61b6f402f7 0 15c036ad25884b82839495fb29bd6395 1 celerey_mail_worker@torob2.celery.pidbox 0 celery 166 celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa 0 celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6 0 

右列中的数字是队列中的任务数量。 在上面,芹菜队列有166个待定任务。

要从后端检索任务,请使用此

 from amqplib import client_0_8 as amqp conn = amqp.Connection(host="localhost:5672 ", userid="guest", password="guest", virtual_host="/", insist=False) chan = conn.channel() name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True) 

如果你不使用优先级任务,那么如果你使用Redis,这实际上很简单 。 要获得任务计数:

 redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME 

但是,优先级任务在redis中使用了不同的密钥 ,所以整个画面稍微复杂一些。 完整的画面是,你需要查询任务优先级的redis。 在python(和从花项目),这看起来像:

 PRIORITY_SEP = '\x06\x16' DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9] def make_queue_name_for_pri(queue, pri): """Make a queue name for redis Celery uses PRIORITY_SEP to separate different priorities of tasks into different queues in Redis. Each queue-priority combination becomes a key in redis with names like: - batch1\x06\x163 <-- P3 queue named batch1 There's more information about this in Github, but it doesn't look like it will change any time soon: - https://github.com/celery/kombu/issues/422 In that ticket the code below, from the Flower project, is referenced: - https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135 :param queue: The name of the queue to make a name for. :param pri: The priority to make a name with. :return: A name for the queue-priority pair. """ if pri not in DEFAULT_PRIORITY_STEPS: raise ValueError('Priority not in priority steps') return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else (queue, '', ''))) def get_queue_length(queue_name='celery'): """Get the number of tasks in a celery queue. :param queue_name: The name of the queue you want to inspect. :return: the number of items in the queue. """ priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in DEFAULT_PRIORITY_STEPS] r = redis.StrictRedis( host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DATABASES['CELERY'], ) return sum([r.llen(x) for x in priority_names]) 

如果你想得到一个实际的任务,你可以使用像这样的东西:

 redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1 

从那里你将不得不反序列化返回的列表。 在我的情况下,我能够做到这一点,如:

 r = redis.StrictRedis( host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DATABASES['CELERY'], ) l = r.lrange('celery', 0, -1) pickle.loads(base64.decodestring(json.loads(l[0])['body'])) 

只需要警告,反序列化可能需要一些时间,您需要调整上面的命令以处理各种优先级。

我认为获取正在等待的任务的唯一方法是保存您启动的任务列表,并让任务在启动时从列表中删除。

使用rabbitmqctl和list_queues,可以获得有多less任务在等待的概述,但不包括任务本身: http ://www.rabbitmq.com/man/rabbitmqctl.1.man.html

如果你想要包含正在处理的任务,但还没有完成,你可以保留你的任务列表,并检查其状态:

 from tasks import add result = add.delay(4, 4) result.ready() # True if finished 

或者让Celery用CELERY_RESULT_BACKEND存储结果,并检查哪些任务不在那里。

芹菜检查模块似乎只知道从工人的angular度来看任务。 如果你想查看队列中的消息(工作人员还没有拉),我build议使用pyrabbit ,它可以与rabbitmq http api连接,从队列中获取各种信息。

一个例子可以在这里find: 用芹菜(RabbitMQ,Django)检索队列长度

我已经得出结论,获得队列中作业数量的最佳方法是使用rabbitmqctl正如我们在这里多次rabbitmqctl那样。 为了允许任何select的用户使用sudo运行命令,我按照这里的说明操作(我没有跳过编辑configuration文件部分,因为我不介意在命令之前键入sudo)。

我也抓住了jamesc的grepcut片段,并将其包装在subprocess调用中。

 from subprocess import Popen, PIPE p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE) p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE) p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE) p1.stdout.close() p2.stdout.close() print("number of jobs on queue: %i" % int(p3.communicate()[0]))