2013-03-22 4 views
3

Я использовал celery.chord (...) для создания группы задач и метода, который вызывается после выполнения всех задач в группе.Разрыв бесконечной петли в сельдерее

Я использую бэкэнд результата amqp (но я хочу переключиться на memcached).

Мой работник печатает эту линию снова и снова, каждую секунду. Я не знаю, как разбить этот бесконечный цикл. У меня есть доступ к веб-интерфейсу rabbitMQ, но я не могу найти что-то с идентификатором «32ba5fe4 -...».

[2013-03-22 14:18:26,896: INFO/MainProcess] Task celery.chord_unlock[32ba5fe4-918c-480f-8a78-a310c11d0c3a] retry: Retry in 1s 
[2013-03-22 14:18:26,897: INFO/MainProcess] Got task from broker: celery.chord_unlock[32ba5fe4-918c-480f-8a78-a310c11d0c3a] eta:[2013-03-22 13:18:27.895123+00:00] 

Это испытательная среда. Никакие данные не могут быть потеряны.

Я использую сельдерей 3.0.16

ответ

4

Он не должен быть бесконечным циклом.

Задача celery.chord_unlock проверяет, завершены ли подзадачи аккорда для вызова задачи обратного вызова слияния. Если он не планирует, он снова проверяет через секунду. Когда ваши задачи аккорда завершены, вы больше не увидите эти сообщения в журнале.

Редакция: Вы можете отменить задание chord_unlock, чтобы остановить цикл

celery.control.revoke('32ba5fe4-918c-480f-8a78-a310c11d0c3a') 
1

Я была такая же проблема. Чтобы остановить цикл, я установил flower, а затем отозвал задачу из меню Задачи в веб-интерфейсе. Кнопка Отменить находится на странице сведений о задаче, которая появляется после нажатия UUID задачи.

1

Для проверки вменяемости, я установил max_retries посредством сигнала при запуске рабочего:

from celery.signals import worker_init 
@worker_init.connect 
def limit_chord_unlock_tasks(worker, **kwargs): 
    """ 
    Set max_retries for chord.unlock tasks to avoid infinitely looping 
    tasks. (see celery/celery#1700 or celery/celery#2725) 
    """ 
    task = worker.app.tasks['celery.chord_unlock'] 
    if task.max_retries is None: 
     retries = getattr(worker.app.conf, 'CHORD_UNLOCK_MAX_RETRIES', None) 
     task.max_retries = retries 

А затем добавить CHORD_UNLOCK_MAX_RETRIES переменную в моей конфигурации сельдерея.

Смежные вопросы