Я использую 2 экземпляра сельдерея.Результат сельдерея, не получается
Конфигурация первой инстанции:
app = Celery('tasks', broker='amqp://[email protected]//')
app.conf.update(
CELERY_RESULT_BACKEND='amqp',
CELERY_TASK_RESULT_EXPIRES=18000,
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER='json',
CELERY_ROUTES={
'task_polling': {
'queue': 'task_polling_queue'
},
'save_shell_task': {
'queue': 'save_shell_task_queue'
},
'save_validation_chain_task': {
'queue': 'save_validation_chain_task_queue'
},
'do_work': {
'queue': 'do_work_queue'
},
'send_mail': {
'queue': 'send_mail_queue'
}
},
)
@shared_task(name='do_work', ignore_result=True)
def do_work(_serialized_task):
for bla in blala:
do_something()
is_canceled = send_task('save_validation_chain_task', [],
{'_params': my_params}).get() == True
запускаемых с помощью следующей команды:
celery -A tasks worker --loglevel=info -Q do_work_queue,send_mail_queue
и второй один:
app = Celery()
app.conf.update(
CELERY_RESULT_BACKEND='amqp',
CELERY_TASK_RESULT_EXPIRES=18000,
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER ='json',
CELERYBEAT_SCHEDULE={
'periodic_task': {
'task': 'task_polling',
'schedule': timedelta(seconds=1),
},
},
CELERY_ROUTES={
'task_polling': {
'queue': 'task_polling_queue'
},
'save_shell_task': {
'queue': 'save_shell_task_queue'
},
'save_validation_chain_task': {
'queue': 'save_validation_chain_task_queue'
},
'do_work': {
'queue': 'do_work_queue'
},
'send_mail': {
'queue': 'send_mail_queue'
}
},
)
@shared_task(name='save_shell_task', ignore_result=True)
def save_shell_task(_result):
ShellUpdate(_json_result=_result).to_db()
@shared_task(name='save_validation_chain_task', ignore_result=False)
def save_validation_chain_task(_result):
return ValidationChainUpdate(_json_result=_result).to_db()
Это один запускается с:
celery -A my_prog worker -B --concurrency=1 -P processes -Q task_polling_queue,save_shell_task_queue,save_validation_chain_task_queue
Проблема в том, что send_task(...).get()
не получает результат. Программа ждет в цикле.
Кажется, сельдерей не получил результат очереди или не ждет результата правой очереди. Проблема, конечно, связана с параметром -Q. Есть ли у вас какие-либо идеи, где может возникнуть проблема в конфигурации?
спасибо
EDIT: Глобальная идея состоит в том, чтобы иметь два экземпляра сельдерея с различными исходными кодами. Поэтому я решил перечислить очереди, чтобы удалить зависимость. Я действительно думаю, что именно поэтому результат не потребляется, поскольку я не могу указать результат очереди в команде, поскольку у этого есть имя переменной для каждого результата (очередь, созданная для каждого результата динамически по сельдериху). Любое решение сохранить два разных исходных кода для экземпляра сельдерея полезно для меня. Я бы хотел, чтобы избежать использования другого результата, поскольку объемный уровень очень низок.
Просто из любопытства, если вы сделали '«my_prog.save_validation_chain_task_queue'' вместо того, чтобы просто '» save_validation_chain_task_queue'' – user2097159
я исправил вызов. Это было 'save_validation_chain_task'. Рабочий хорошо вызван, но результат не получен. Если бы у меня было пространство имен my_prog, рабочий вообще не вызывался. – Julio