2012-04-09 4 views
5

У меня есть два отдельных процесса celeryd, работающих на моем сервере, которыми управляет supervisor. Они устанавливаются для прослушивания отдельных очередей, как например:Маршрутизация задачи сельдерея в определенной очереди

[program:celeryd1] 
command=/path/to/celeryd --pool=solo --queues=queue1 
... 

[program:celeryd2] 
command=/path/to/celeryd --pool=solo --queues=queue2 
... 

И мой celeryconfig выглядит примерно так:

from celery.schedules import crontab 

BROKER_URL = "amqp://guest:[email protected]:5672//" 

CELERY_DISABLE_RATE_LIMITS = True 
CELERYD_CONCURRENCY = 1 
CELERY_IGNORE_RESULT = True 

CELERY_DEFAULT_QUEUE = 'default' 
CELERY_QUEUES = { 
    'default': { 
     "exchange": "default", 
     "binding_key": "default", 
    }, 
    'queue1': { 
     'exchange': 'queue1', 
     'routing_key': 'queue1', 
    }, 
    'queue2': { 
     'exchange': 'queue2', 
     'routing_key': 'queue2', 
    }, 
} 

CELERY_IMPORTS = ('tasks',) 

CELERYBEAT_SCHEDULE = { 
    'first-queue': { 
     'task': 'tasks.sync', 
     'schedule': crontab(hour=02, minute=00), 
     'kwargs': {'client': 'client_1'}, 
     'options': {'queue': 'queue1'}, 
    }, 
    'second-queue': { 
     'task': 'tasks.sync', 
     'schedule': crontab(hour=02, minute=00), 
     'kwargs': {'client': 'client_2'}, 
     'options': {'queue': 'queue1'}, 
    }, 
} 

Все tasks.sync задачи должны быть направлены на определенную очередь (и, следовательно, celeryd прогресса). Но когда я пытаюсь запустить задачу вручную с помощью sync.apply_async(kwargs={'client': 'value'}, queue='queue1'), оба работника сельдерея поднимут задачу. Как я могу сделать маршрут задачи к правильной очереди и только запустить рабочий, который связан с очередью?

ответ

6

Вы используете только один экземпляр celerybeat?

Возможно, у вас есть старые привязки в очереди, которые сталкиваются с этим? Попробуйте запустить rabbitmqctl list_queues и rabbitmqctl list_bindings, Возможно, сбросьте данные в брокере, чтобы начать с нуля.

Пример, который у вас здесь, должен работать, и работает для меня, когда я просто попробовал.

Совет. Поскольку вы используете одно и то же значение exchange и binding_key в качестве имени очереди, , вам не нужно явно указывать их в CELERY_QUEUES. Когда CELERY_CREATE_MISSING_QUEUES включен (что по умолчанию), очереди будут автоматически созданы точно так же, как у вас есть , если вы просто сделаете celeryd -Q queue1 или отправите задачу в неопределенную очередь.

Смежные вопросы