2013-09-13 5 views
2

Я пытаюсь сделать приложение с помощью виолончели. Он должен работать на нескольких рабочих, а разные рабочие потребляют из разных очередей. У меня есть что-то вроде этого:Сельдерей, задача, вызванная из другой задачи, не работает

@celery.task 
def task1(): 
    do_something() 
    task2.delay() 

@celery.task 
def task2() 
    do_something() 

так task1, который работает на worker1 должен вызвать TASK2, которые должны отправить в очереди, из которой потребляющего worker2. Проблема в том, что она не работает. Я получаю идентификатор AsyncResult, но состояние этой задачи - все время PENDING. Когда я вызываю task2 вручную из консоли python, он отлично работает. Может быть, я делаю что-то неправильно, и невозможно запустить одну задачу из другой? И еще одна вещь. Worker1 делает task1 и отправляет task2 в очередь, из которого он не потребляет - из этой очереди потребляется только worker2

+0

Конечно, можно вызвать одну задачу из другой, в вашем случае task2 из задачи1. Но вы, похоже, ожидаете, что задача1 отправит task2 в другую очередь, поэтому task1 находится в «queue1» и толкает task2 на «queue2». Можете ли вы сказать что-то еще о том, как вы называете task1 и начинаете сельдерей? Также вы знаете, что если вы хотите, чтобы task2 переходила в другую очередь, вы должны использовать метод apply_async с именем очереди в качестве аргумента: task2.apply_async (queue = "nameOfQ"), правильно? (Вы используете метод задержки в своем примере) –

ответ

0

Вот простой пример, который, я думаю, выполняет то, что вы хотите.

from celery import Celery 
import random 
import string 

celery = Celery('two_q',backend='amqp',broker='amqp://[email protected]//') 

@celery.task 
def generate_rand_string(n): 
    # n = number of characters 
    rand_str = "".join([random.choice(string.lowercase) for i in range(n)]) 
    #calls the second task and adds it to second queue 
    reverse.apply_async((rand_str,),queue="q2") 
    print rand_str 
    return rand_str 

@celery.task 
def reverse(s): 
    print s[::-1] 
    return s[::-1] 

generate_rand_string.apply_async((10,), queue="q1") 

При вызове с -Q аргумент, который определяет список очередей

celery worker --app=two_q -l info -Q q1,q2 

производит следующий вывод:

[email protected]:~/py/celery$ celery worker --app=two_q -l info -Q q1,q2 

-------------- [email protected] v3.0.23 (Chiastic Slide) 
---- **** ----- 
--- * *** * -- Linux-3.2.0-54-generic-pae-i686-with-Ubuntu-12.04-precise 
-- * - **** --- 
- ** ---------- [config] 
- ** ---------- .> broker:  amqp://[email protected]:5672// 
- ** ---------- .> app:   cel_group:0x9bfef8c 
- ** ---------- .> concurrency: 4 (processes) 
- *** --- * --- .> events:  OFF (enable -E to monitor this worker) 
-- ******* ---- 
--- ***** ----- [queues] 
-------------- .> q1:   exchange:q1(direct) binding:q1 
       .> q2:   exchange:q2(direct) binding:q2 

[Tasks] 
    . two_q.generate_rand_string 
    . two_q.reverse 

[2013-09-15 19:10:35,708: WARNING/MainProcess] [email protected] ready. 
[2013-09-15 19:10:35,716: INFO/MainProcess] consumer: Connected to amqp://[email protected]:5672//. 
[2013-09-15 19:10:40,731: INFO/MainProcess] Got task from broker: two_q.generate_rand_string[fa2ad56e-c66d-44a9-b908-2d95b2c9e5f3] 
[2013-09-15 19:10:40,767: WARNING/PoolWorker-1] jjikjkepkc 
[2013-09-15 19:10:40,768: INFO/MainProcess] Got task from broker: two_q.reverse[f52a8247-4674-4183-a826-d73cef1b64d4] 
[2013-09-15 19:10:40,770: INFO/MainProcess] Task two_q.generate_rand_string[fa2ad56e-c66d-44a9-b908-2d95b2c9e5f3] succeeded in 0.0217289924622s: 'jjikjkepkc' 
[2013-09-15 19:10:40,782: WARNING/PoolWorker-3] ckpekjkijj 
[2013-09-15 19:10:40,801: INFO/MainProcess] Task two_q.reverse[f52a8247-4674-4183-a826-d73cef1b64d4] succeeded in 0.0195469856262s: 'ckpekjkijj' 

Вы получаете две очереди (q1, q2) и двух рабочих.

Для сравнения, если вы либо назвать его без -Q аргумента или только с одной очереди:

celery worker --app=two_q -l info 

«обратный» задача не будет называться, потому что q2, к которому он добавлен не будет известен к сельдерею.

Надеюсь, это поможет.

Смежные вопросы