2013-09-05 2 views
6

В настоящее время я использую django с сельдереем, и все работает нормально.Django Celery получить задание

Однако я хочу иметь возможность дать пользователям возможность отменить задачу, если сервер перегружен, проверяя, сколько задач в настоящее время запланировано.

Как я могу это достичь?

Я использую redis как брокер.

Я только что нашел это: Retrieve list of tasks in a queue in Celery

Это как-то относиться к моей проблеме, но мне не нужно перечислить задачи, просто сосчитать :)

ответ

8

Если ваш брокер настроен как redis://localhost:6379/1, и ваши задачи представляются общей celery очереди, то вы можете получить длину следующими способами:

import redis 
queue_name = "celery" 
client = redis.Redis(host="localhost", port=6379, db=1) 
length = client.llen(queue_name) 

Или, из скрипта (хорошо для мониторов и такие):

$ redis-cli -n 1 -h localhost -p 6379 llen celery 
+0

Несмотря на то, что это правильное решение для Redis брокера, пожалуйста, отметьте @stephen комментарий Fuhry в качестве правильного решения, как это брокер агностик , –

4

Если вы уже настроили Redis в вашем приложении, вы можете попробовать это:

from celery import Celery 

QUEUE_NAME = 'celery' 

celery = Celery(app) 
client = celery.connection().channel().client 

length = client.llen(QUEUE_NAME) 
+0

Для redis, 'client = app.broker_connection(). Channel(). Client' –

7

Вот как вы можете получить количество сообщений в очереди с помощью сельдерея, который брокерско агностик.

Используя connection_or_acquire, вы можете свести к минимуму количество открытых подключений к вашему брокеру, используя объединение внутренних подключений сельдерея.

celery = Celery(app) 

with celery.connection_or_acquire() as conn: 
    conn.default_channel.queue_declare(
     queue='my-queue', passive=True).message_count 

Вы также можете расширить сельдерей, чтобы обеспечить эту функциональность:

from celery import Celery as _Celery 


class Celery(_Celery) 

    def get_message_count(self, queue): 
     ''' 
     Raises: amqp.exceptions.NotFound: if queue does not exist 
     ''' 
     with self.connection_or_acquire() as conn: 
      return conn.default_channel.queue_declare(
       queue=queue, passive=True).message_count 


celery = Celery(app) 
num_messages = celery.get_message_count('my-queue') 
+2

Просьба также дать какое-то объяснение, чтобы поддержать ваш ответ. – Lal

+0

@Lal Добавлено некоторое объяснение подхода - надеюсь, что это поможет! –

+1

amqp.exceptions.NotFound: Queue.declare: (404) NOT_FOUND - no queue 'default' in vhost '/' Поскольку моя очередь не находится на хосте «/», она находится на хосте «/ apples». Как мне добраться до этого хоста? – Simanas

Смежные вопросы