2013-03-22 4 views
24

Я использую сельдерей (3.0.15) с Redis в качестве брокера.Как проверить и отменить задачи сельдерея по названию задачи

Есть ли простой способ запросить количество задач с заданным именем, которые существуют в очереди сельдерея?

И в качестве продолжения есть способ отменить все задачи с заданным именем, которые существуют в очереди сельдерея?

Я прошел через Monitoring and Management Guide и не вижу там решения.

ответ

23
# Retrieve tasks 
# Reference: http://docs.celeryproject.org/en/latest/reference/celery.events.state.html 
query = celery.events.state.tasks_by_type(your_task_name) 

# Kill tasks 
# Reference: http://docs.celeryproject.org/en/latest/userguide/workers.html#revoking-tasks 
for uuid, task in query: 
    celery.control.revoke(uuid, terminate=True) 
+3

Это звучало многообещающе, но я не смог получить никаких результатов. Я установил 'CELERY_SEND_TASK_SENT_EVENT' для' True' для моих работников, но вызывает 'celery.events.State(). Tasks_by_type (...)' возвращает пустой список. – Mzzzzzz

+0

Uhm, попробуйте использовать * напрямую * 'celery.events.state.state', как' ceralmon'. [См. Источник] (https://github.com/celery/celerymon/blob/master/celerymon/handlers/api.py). – gioi

+0

Из оболочки сельдерея 'celery.events.state.state' по-прежнему дает мне пустой список. Есть ли отсутствующий шаг для инициализации потребления из очереди событий? – Mzzzzzz

1

Похоже flower обеспечивает мониторинг:

https://github.com/mher/flower

мониторинг с помощью Сельдерей События прогресса

задач и история Возможность показать детали задач в режиме реального времени (аргументы, начало время, время работы и т. д.) Графики и статистика Пульт дистанционного управления

Просмотра состояние работника и статистика Shutdown и рестарт рабочего размера экземпляров управления рабочего пула и автомасштаб Просмотр настроек и изменение очереди экземпляр рабочего потребляет от просмотра текущих запущенных задач Просмотра запланированных задач (ETA/обратный отсчет) Посмотреть отведенные и отозваны задачами Применить время и ограничение скорости для просмотра конфигурации Отозвать или прекратить ЗАДАЧИ HTTP API проверки подлинности

OpenID

2

Вы можете сделать это в одном запросе:

app.control.revoke([ 
    uuid 
    for uuid, _ in 
    celery.events.state.State().tasks_by_type(task_name) 
]) 
+0

Удивительное решение с одним вкладышем – emanuelcds

9

Существует одна проблема, с которой ранее ответы не затрагивались и могут отбрасывать людей, если они не знают об этом.

Среди этих решений уже писал, я хотел бы использовать Danielle's с одним небольшим изменением: Я бы импортировать задачу в свой файл и использовать его атрибут .name, чтобы получить имя задачи перейти на .tasks_by_type().

app.control.revoke(
    [uuid for uuid, _ in 
    celery.events.state.State().tasks_by_type(task.name)]) 

Однако это решение будет игнорировать те задачи, которые были запланированы для будущего исполнения. Как и некоторые люди, которые комментировали другие ответы, когда я проверил, что .tasks_by_type() return, у меня был пустой список. И действительно, мои очереди были пусты. Но я знал, что в будущем запланированы задачи, и эти были моей главной целью. Я мог видеть их, выполнив celery -A [app] inspect scheduled, но на них не повлиял приведенный выше код.

мне удалось отменить запланированные задачи, делая это:

app.control.revoke(
    [scheduled["request"]["id"] for scheduled in 
    chain.from_iterable(app.control.inspect().scheduled() 
         .itervalues())]) 

app.control.inspect().scheduled() возвращает словарь, ключи которого являются именами рабочих и значение списков информации планирования (следовательно, потребности в chain.from_iterable которая импортируется от itertools).Информация о задаче находится в поле "request" информации о планировании, а "id" содержит идентификатор задачи. Обратите внимание, что даже после аннулирования запланированная задача по-прежнему будет отображаться среди запланированных задач. Запланированные задачи, которые будут отменены, не будут удалены из списка запланированных задач до истечения срока их таймера или до тех пор, пока Celery не выполнит какую-либо операцию очистки. (Перезагрузка работников запускает такую ​​очистку.)

Смежные вопросы