2014-11-14 3 views
2

Я использую 2 экземпляра сельдерея.Результат сельдерея, не получается

Конфигурация первой инстанции:

app = Celery('tasks', broker='amqp://[email protected]//') 

app.conf.update(
    CELERY_RESULT_BACKEND='amqp', 
    CELERY_TASK_RESULT_EXPIRES=18000, 
    CELERY_ACCEPT_CONTENT=['json'], 
    CELERY_TASK_SERIALIZER='json', 

    CELERY_ROUTES={ 
     'task_polling': { 
      'queue': 'task_polling_queue' 
     }, 
     'save_shell_task': { 
      'queue': 'save_shell_task_queue' 
     }, 
     'save_validation_chain_task': { 
      'queue': 'save_validation_chain_task_queue' 
     }, 
     'do_work': { 
      'queue': 'do_work_queue' 
     }, 
     'send_mail': { 
      'queue': 'send_mail_queue' 
     } 
    }, 
) 

@shared_task(name='do_work', ignore_result=True) 
def do_work(_serialized_task): 
    for bla in blala: 
     do_something() 
     is_canceled = send_task('save_validation_chain_task', [], 
           {'_params': my_params}).get() == True 

запускаемых с помощью следующей команды:

celery -A tasks worker --loglevel=info -Q do_work_queue,send_mail_queue 

и второй один:

app = Celery() 

app.conf.update(
    CELERY_RESULT_BACKEND='amqp', 
    CELERY_TASK_RESULT_EXPIRES=18000, 
    CELERY_ACCEPT_CONTENT=['json'], 
    CELERY_TASK_SERIALIZER ='json', 
    CELERYBEAT_SCHEDULE={ 
     'periodic_task': { 
      'task': 'task_polling', 
      'schedule': timedelta(seconds=1), 
     }, 
    }, 
    CELERY_ROUTES={ 
     'task_polling': { 
      'queue': 'task_polling_queue' 
     }, 
     'save_shell_task': { 
      'queue': 'save_shell_task_queue' 
     }, 
     'save_validation_chain_task': { 
      'queue': 'save_validation_chain_task_queue' 
     }, 
     'do_work': { 
      'queue': 'do_work_queue' 
     }, 
     'send_mail': { 
      'queue': 'send_mail_queue' 
     } 
    }, 
) 


@shared_task(name='save_shell_task', ignore_result=True) 
def save_shell_task(_result): 
    ShellUpdate(_json_result=_result).to_db() 


@shared_task(name='save_validation_chain_task', ignore_result=False) 
def save_validation_chain_task(_result): 
    return ValidationChainUpdate(_json_result=_result).to_db() 

Это один запускается с:

celery -A my_prog worker -B --concurrency=1 -P processes -Q task_polling_queue,save_shell_task_queue,save_validation_chain_task_queue 

Проблема в том, что send_task(...).get() не получает результат. Программа ждет в цикле.

Кажется, сельдерей не получил результат очереди или не ждет результата правой очереди. Проблема, конечно, связана с параметром -Q. Есть ли у вас какие-либо идеи, где может возникнуть проблема в конфигурации?

спасибо

EDIT: Глобальная идея состоит в том, чтобы иметь два экземпляра сельдерея с различными исходными кодами. Поэтому я решил перечислить очереди, чтобы удалить зависимость. Я действительно думаю, что именно поэтому результат не потребляется, поскольку я не могу указать результат очереди в команде, поскольку у этого есть имя переменной для каждого результата (очередь, созданная для каждого результата динамически по сельдериху). Любое решение сохранить два разных исходных кода для экземпляра сельдерея полезно для меня. Я бы хотел, чтобы избежать использования другого результата, поскольку объемный уровень очень низок.

+0

Просто из любопытства, если вы сделали '«my_prog.save_validation_chain_task_queue'' вместо того, чтобы просто '» save_validation_chain_task_queue'' – user2097159

+0

я исправил вызов. Это было 'save_validation_chain_task'. Рабочий хорошо вызван, но результат не получен. Если бы у меня было пространство имен my_prog, рабочий вообще не вызывался. – Julio

ответ

4

У вас есть правильная настройка и конфигурация. Единственная проблема в том, что вы задали ignore_result для задания do_work.

@shared_task(name='do_work', ignore_result=True) 

При установке этого, даже если ваша задача завершается работником state задачи будет всегда PENDING. Вот почему, когда вы делаете .get() по этой задаче, она никогда не завершает выполнение этого оператора.

Поскольку вы принимаете только json

CELERY_ACCEPT_CONTENT=['json'], 
CELERY_TASK_SERIALIZER ='json', 

также необходимо установить

CELERY_RESULT_SERIALIZER = 'json', 

в обоих ваших конфигов.

Примечание:

В вашем случае, ваш делают .get() на задаче ВНУТРИ другой задачи. Этого следует избегать. На данный момент это будет нормально работать. От celery 3.2 it will raise and error instead of warining.

Вы можете использовать chain, чтобы предотвратить запуск синхронных подзадач, если они соответствуют вашим потребностям. Когда цепочка вызывается, она возвращает объект async_result, у которого есть родительское свойство.Например,

task1 = add.s(1,2) 
task2 = add.s(5) 
task3 = add.s(10) 

result = chain(task1 | task2 | task3)() 

result.revoke(terminate=True)     # revokes task3 
result.parent.revoke(terminate=True)   # revokes task2 
result.parent.parent.revoke(terminate=True) # revokes task1 

Если они не подходят, вы можете использовать сигналы для вызова некоторых других задач/функций. Вот простой пример (я не тестировал этот код).

from celery.signals import task_success 

@app.task 
def small_task(): 
     print('small task completed') 


@app.task 
@task_success.connect(sender=small_task) 
def big_task(**kwargs): 
     print('called by small_task. LOL {0}'.format(kwargs)) 
+0

Исправлена ​​проблема с установкой 'CELERY_RESULT_SERIALIZER'. Спасибо. Что касается функции .get() внутри задачи, цепочка хороша, если у вас есть небольшой контекст для обмена между каждым работником. В моем случае у меня есть рабочий с большим контекстом. Для получения требуется только знать, должен ли работник продолжать работу над этой большой задачей или нет. Это обходное решение, так как невозможно отменить текущую задачу с сельдереем. Есть ли у вас лучшее предложение, чтобы он соответствовал 3.2? – Julio

+0

Я в основном использую цепочку задач с неизменной подписью. Вы можете использовать сигналы для своего случая. См. Обновленный метод. – ChillarAnand

+0

Спасибо, но я действительно не знаю, как обрабатывать команду остановки в вашей big_task. Мое понимание из вашей ссылки на сельдерей заключается в том, что я должен сохранить контекст в «memcached» и сократить большую задачу в маленькой цепочке. В моем случае очень сложно настроить большую задачу. У вас есть еще одно предложение настроить простую команду остановки? – Julio

Смежные вопросы