Как я могу получить список задач в очереди, которые еще не обработаны?Получить список задач в очереди в сельдерей
ответ
Я думаю, что единственный способ получить ожидаемые задачи - сохранить список запущенных задач и позволить удаленной работе из списка при запуске.
С rabbitmqctl и list_queues вы можете получить представление о том, как много задач, которые ждут, но не себя задачи: http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
Если то, что вы хотите, включает в себя задание обрабатывается, но еще не закончена, вы можете сохранить список вас задач и проверки их состояния:
from tasks import add
result = add.delay(4, 4)
result.ready() # True if finished
Или вы позволяете сельдерей хранить результаты с CELERY_RESULT_BACKEND и проверить, какие из ваших задач, не находятся там.
EDIT: см. Другие ответы для получения списка задач в очереди.
Вы должны посмотреть здесь: Celery Guide - Inspecting Workers
В основном это:
>>> from celery.task.control import inspect
# Inspect all nodes.
>>> i = inspect()
# Show the items that have an ETA or are scheduled for later processing
>>> i.scheduled()
# Show tasks that are currently active.
>>> i.active()
# Show tasks that have been claimed by workers
>>> i.reserved()
В зависимости от того, что вы хотите
Я пробовал это, но это очень медленно (например, 1 сек). Я использую его синхронно в приложении для торнадо, чтобы отслеживать прогресс, поэтому он должен быть быстрым. – JulienFr
Это не вернет список задач в очереди, которые еще не обработаны. –
Используйте 'i.reserved()' для получения списка заданий в очереди. – Banana
Для извлечения задач из бэкэнда, используйте этот
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)
но «заданий» дает только количество задач в очереди – bitnik
если вы используете раввинов tMQ, используйте это в терминале:
sudo rabbitmqctl list_queues
он распечатает список очередей с количеством ожидающих задач. например:
Listing queues ...
0b27d8c59fba4974893ec22d478a7093 0
0e0a2da9828a48bc86fe993b210d984f 0
[email protected] 0
11926b79e30a4f0a9d95df61b6f402f7 0
15c036ad25884b82839495fb29bd6395 1
[email protected] 0
celery 166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa 0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6 0
Число в правом столбце - количество задач в очереди. В приведенном выше примере очередь сельдерея имеет 166 ожидающих задач.
Я знаком с этим, когда у меня есть привилегии sudo, но я хочу, чтобы непривилегированный системный пользователь мог проверить - какие-либо предложения? – sage
Кроме того, вы можете использовать это через 'grep -e '^ celery \ s" | cut -f2', чтобы извлечь этот '166', если вы хотите обработать этот номер позже, скажем, для статистики. – jamesc
Модуль инспекции сельдерея, по-видимому, знает только о задачах с точки зрения рабочих. Если вы хотите просмотреть сообщения, находящиеся в очереди (пока их не потянули работники), я предлагаю использовать pyrabbit, который может взаимодействовать с httpbit http rabbitmq для извлечения всех видов информации из очереди.
Пример можно найти здесь: Retrieve queue length with Celery (RabbitMQ, Django)
Если вы не используете приоритизированные задачи, это на самом деле pretty simple, если вы используете Redis. Для того, чтобы получить отсчеты задачи:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME
Но, приоритетные задачи use a different key in redis, поэтому полная картина несколько сложнее. Полная картина в том, что вам нужно запросить redis для каждого приоритета задачи.В Python (и от проекта Flower), это выглядит следующим образом:
PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]
def make_queue_name_for_pri(queue, pri):
"""Make a queue name for redis
Celery uses PRIORITY_SEP to separate different priorities of tasks into
different queues in Redis. Each queue-priority combination becomes a key in
redis with names like:
- batch1\x06\x163 <-- P3 queue named batch1
There's more information about this in Github, but it doesn't look like it
will change any time soon:
- https://github.com/celery/kombu/issues/422
In that ticket the code below, from the Flower project, is referenced:
- https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135
:param queue: The name of the queue to make a name for.
:param pri: The priority to make a name with.
:return: A name for the queue-priority pair.
"""
if pri not in DEFAULT_PRIORITY_STEPS:
raise ValueError('Priority not in priority steps')
return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
(queue, '', '')))
def get_queue_length(queue_name='celery'):
"""Get the number of tasks in a celery queue.
:param queue_name: The name of the queue you want to inspect.
:return: the number of items in the queue.
"""
priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
DEFAULT_PRIORITY_STEPS]
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],
)
return sum([r.llen(x) for x in priority_names])
Если вы хотите, чтобы получить реальную задачу, вы можете использовать что-то вроде:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1
Оттуда вам придется десериализовать возвращенный список. В моем случае я был в состоянии сделать это с чем-то вроде:
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))
Просто быть предупрежден, что десериализации может занять несколько секунд, и вы должны будете настроить команды выше для работы с различными приоритетами.
Я пришел к выводу, что лучший способ получить количество заданий в очереди - использовать rabbitmqctl
, как было предложено несколько раз здесь. Для того, чтобы разрешить любой выбрал пользователь запускает команду с sudo
Я следовал инструкциям here (я пропустить редактирование профиля часть, как я не возражаю набрав Sudo перед командой.)
я схватил jamesc в grep
и cut
сниппет и завернул его в вызовы подпроцесса.
from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))
- 1. Сельдерей Получить список зарегистрированных задач
- 2. Как получить список задач в очереди GCD?
- 3. Сельдерей: выборочно очистка очереди от задач Python
- 4. Сельдерей - задачи очереди сыпучие
- 5. Получить сельдерей каждый журнал задач
- 6. Очереди с приоритетом сельдерей
- 7. сельдерей: получить сообщение в очереди как целое
- 8. Список задач в очереди с ActiveJob AsyncAdapter
- 9. Задачи сельдерей, работники и очереди
- 10. Получить количество задач в именованной очереди?
- 11. Как перенаправить цепочку задач в определенную очередь в сельдерей?
- 12. Лучший способ проверить несколько задач, выполняемых в сельдерей?
- 13. сельдерей: как обновить список запланированных задач после отзыва?
- 14. Группа сельдерей несколько задач в одном дизайне
- 15. Время регистрации в очереди заданий на сельдерей
- 16. Сельдерей Почему задача остается в очереди
- 17. Сельдерей для управления Java задач
- 18. Выключите сельдерей задержку задач на сервере разработки
- 19. Почему сельдерей.control.inspect сообщает меньше задач в очереди, чем rabbitmqctl?
- 20. Удаление всех задач в очереди ThreadPoolExecutor
- 21. Почему сельдерей не встраивает временную метку в сообщение в очереди?
- 22. Как удалить задачи из очереди задач сельдерея?
- 23. Обзор системы очереди задач
- 24. Выполнение очереди задач
- 25. Добавление задач в список задач в sharepoint
- 26. Получить список задач Gradle в безопасном типе
- 27. Как получить список доступных задач в gulp
- 28. Создать сельдерей нескольких очередей
- 29. Как получить неудачные задачи в сельдерей?
- 30. JBPM - Получить список всех задач
Какой бэкэнд вы используете? – Bacon
RabbitMQ, но я хочу получить этот список внутри Python. –