2013-06-25 2 views
2

У меня есть различные очереди Кролика каждый из которых посвящена особого вида обработки заказа:Можно ли использовать пользовательские маршруты для примитивов сельдерея?

# tasks.py 

@celery.task 
def process_order_for_product_x(order_id): 
    pass # elided ... 


@celery.task 
def process_order_for_product_y(order_id): 
    pass # elided ... 


# settings.py 

CELERY_QUEUES = { 
    "black_hole": { 
     "binding_key": "black_hole", 
     "queue_arguments": {"x-ha-policy": "all"} 
    }, 
    "product_x": { 
     "binding_key": "product_x", 
     "queue_arguments": {"x-ha-policy": "all"} 
    }, 
    "product_y": { 
     "binding_key": "product_y", 
     "queue_arguments": {"x-ha-policy": "all"} 
    }, 

Мы имеем политику применения явной маршрутизации путем установки CELERY_DEFAULT_QUEUE = 'black_hole', а затем никогда не потребляя от black_hole.

Каждая из этих задач можно использовать холст примитивов сельдерей, как и так:

# tasks.py 

@celery.task 
def process_order_for_product_x(order_id): 
    # These can run in parallel 
    stage_1_group = group(do_something.si(order_id), 
          do_something_else.si(order_id)) 

    # These can run in parallel 
    another_group = group(do_something_at_end.si(order_id), 
          do_something_else_at_end.si(order_id)) 

    # These run in a linear sequence 
    process_task = chain(
     stage_1_group, 
     do_something_dependent_on_stage_1.si(order_id), 
     another_group) 

    process_task.apply_async() 

Предположив Я хочу конкретные виды использования celery.group, celery.chord, celery.chord_unlock и другие задачи, холст течь через очередь для соответствующего продукта, вместо того, чтобы попасть в ловушку в black_hole, существует ли способ вызвать каждую конкретную задачу холста с именем пользовательской задачи или настраиваемой функцией routing_key?

По причинам, в которые я не поеду, я бы предпочел не отправлять все задания celery.* в очередь celery_canvas, что я и делаю тем временем.

ответ

4

Этот метод позволяет задач брезентовых маршрут сельдерей очереди задачи обратного вызова.

Можно указать настраиваемый маршрутизатор задач класса для сельдерея, как описано here.

Давайте сосредоточимся на задаче celery.chord_unlock. Его подпись определена here.

def unlock_chord(self, group_id, callback, ...): 

Второй позиционный аргумент - это подпись задачи обратного вызова аккорда.

Подписи задач в сельдерей в основном являются диктофонами, что дает нам возможность получить доступ к параметрам задачи, включая имя очереди задач.

Вот пример:

class CeleryRouter(object): 
    def route_for_task(self, task, args=None, kwargs=None): 
     if task == 'celery.chord_unlock': 
      callback_signature = args[1] 
      options = callback_signature.get('options') 
      if options: 
       queue = options.get('queue') 
       if queue: 
        return {'queue': queue} 

Добавьте его в сельдерея конфигурации:

CELERY_ROUTES = (CeleryRouter(), 
0

В настоящее время я использую сельдерей в своем проекте. Для некоторых сценариев мне нужна задача цепи, хотя различные очереди:

chain(get_staff.s(url), save_staff.s(dt, partner_id, url))() 

Эти две функции объявлены следующим образом:

@task(queue='celery_gevent') 
def get_staff(source_url): 

@task # send to default queue 
def save_staff(suggests, dt, partner, url): 

Кстати, celery_gevent обрабатывается работником с GEvent бассейн, чтобы сделать HTTP Запросы.

В этом примере, как вы можете указать очередь неявно. Также вы можете явно положить задачи в другую очередь, указав дополнительные Params, например, так:

In [1]: add.apply_async([4,5]) 
Out[1]: <AsyncResult: bda3dedd-c2c4-44db-be8e-6a97e718f8b0> 

$ sudo rabbitmqctl list_queues 
Listing queues ... 
celery 1 
...done. 

In [2]: add.apply_async([4,5], queue='your_product') 
Out[2]: <AsyncResult: 934f6161-298b-468b-9716-3da6fae58fa5> 

$ sudo rabbitmqctl list_queues 
Listing queues ... 
celery 1 
your_product 1 
...done. 

Вы можете запустить весь холст в пользовательской очереди:

process_task.apply_async(queue='your_queue') 

Попробуйте указать queue_name внутри @task декоратора , Это должно помочь.

Ссылки:

http://docs.celeryproject.org/en/latest/reference/celery.app.task.html

http://docs.celeryproject.org/en/latest/_modules/celery/app/task.html#Task.apply_async