2013-05-16 2 views
1

Так что для некоторых из моих задач на сельдерее 3.0.19, сельдерей, по-видимому, не уважая атрибут очереди, и вместо того, чтобы грузить задачу к очереди сельдерея по умолчаниюсельдерея не уважая атрибут очереди

/This is a stupid test with the proprietary code ripped out. 
def run_chef_task(task, **env): 
if env is None: 
    env = {} 
if not task_name is None: 
    env['CHEF'] = task_name 

print env 
cmd = [] 
if len(env): 
    cmd = ['env'] 
    for key, value in env.items(): 
     if not isinstance(key, str) or not isinstance(value, str): 
      raise TypeError(
       "Environment Values must be strings ({0}, {1})"\ 
       .format(key, value)) 
     key = "ND" + key.upper() 
     cmd.append('%s=%s' % (key, value)) 


cmd.extend(['/root/chef/run_chef', 'noudata_default']) 
print cmd 
ret = " ".join(cmd) 
ret = subprocess.check_call(cmd) 
print 'CHECK' 
return ret,cmd 

г = run_chef_task.apply_async (Args = [ 'mongo_backup], очереди = 'my_special_queue_with_only_one_worker') r.get() # возвращает немедленно

Перейти к цветку. Посмотрите задачу. Посмотрите на работника, чтобы эта задача продолжалась. Посмотрите, что рабочий отличается и что рабочий, над которым выполнялась задача, НЕ является специальным работником. Подтвердите, что Flower говорит, что «special_worker» находится только на «my_special_queue», а ТОЛЬКО «special_worker» не включен в «my_special_queue».

Теперь вот действительно интересная часть:

Подтяните RabbitMQ-менеджмент на брокера (и подтвердить, что брокер является брокером).
Было отправлено сообщение через посредника в правильной очереди правильному работнику (проверено). Сразу же после этого, еще одно сообщения получило отправлено в очереди сельдерея

И в логе-файле для работника, он говорит, что он принял и выполнил задачу:

[2013-05-16 02:24:15,455: INFO/MainProcess] Got task from broker: noto.tasks.chef_tasks.run_chef_task[0dba1107-2bb5-4c19-8df3-8a74d8e1234c] 
[2013-05-16 02:24:15,456: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x2479c08> (args:('noto.tasks.chef_tasks.run_chef_task', '0dba1107-2bb5-4c19-8df3-8a74d8e1234c', ['mongo_backup'], {}, {'utc': True, 'is_eager': False, 'chord': None, 'group': None, 'args': ['mongo_backup'], 'retries': 0, 'delivery_info': {'priority': None, 'routing_key': u'', 'exchange': u'celery'}, 'expires': None, 'task': 'noto.tasks.chef_tasks.run_chef_task', 'callbacks': None, 'errbacks': None, 'hostname': 'manager1.i-6e958f0f', 'taskset': None, 'kwargs': {}, 'eta': None, 'id': '0dba1107-2bb5-4c19-8df3-8a74d8e1234c'}) kwargs:{}) 
// This is output from the task 
[2013-05-16 02:24:15,459: WARNING/PoolWorker-1] {'CHEF': 'mongo_backup'} 

[2013-05-16 02:24:15,463: WARNING/PoolWorker-1] ['env', 'NDCHEF=mongo_backup', '/root/chef/run_chef', 'default'] 
[2013-05-16 02:24:15,477: DEBUG/MainProcess] Task accepted: noto.tasks.chef_tasks.run_chef_task[0dba1107-2bb5-4c19-8df3-8a74d8e1234c] pid:17210 
...A bunch of boring debug logs repeating the registered tasks 
[2013-05-16 02:31:45,061: INFO/MainProcess] Task noto.tasks.chef_tasks.run_chef_task[0dba1107-2bb5-4c19-8df3-8a74d8e1234c] succeeded in 88.438395977s: (0, ['env', 'NDCHEF=mongo_backup',... 

Так это принимающая задачу, выполняющая задачу, и запускать ДРУГОЙ рабочий в ДРУГОЙ КОРОТКОЙ, ПОСЛЕДУЮЩИЙ, чтобы запустить его в одно и то же время вместо того, чтобы правильно возвращаться. Единственное, о чем я могу думать, это то, что этот рабочий является единственным, у которого есть правильный источник. Все остальные работники имеют старый источник с запросом на подпроцесс, поэтому они возвращаются более или менее мгновенно.

Кто-нибудь знает, что вызывает это? Это не единственная задача, когда мы видели, как это происходит, поскольку, похоже, для запуска этой программы выбрано 3 случайных компьютера. Есть ли что-то странное, что мы сделали с нашим celeryconfig, который мог бы вызвать это?

ответ

1

Ваш журнал TaskPool не предполагает явного маршрутизации см routing_key & по умолчанию «обмен»:

'delivery_info': {'priority': None, 'routing_key': u'', 'exchange': u'celery'} 

Я бы догадаться, этот вопрос является из автоматических дефолтов коробки. Рассмотрите возможность проверки явной ручной маршрутизации в конфигурации celery.

http://docs.celeryproject.org/en/latest/userguide/routing.html#manual-routing

, например:

CELERY_ROUTES = { 
"work-queue": { 
    "queue": "work_queue", 
    "binding_key": "work_queue" 
}, 
"new-feeds": { 
    "queue": "new_feeds", 
    "binding_key": "new_feeds" 
}, 
} 

CELERY_QUEUES = { 
"work_queue": { 
    "exchange": "work_queue", 
    "exchange_type": "direct", 
    "binding_key": "work_queue", 
}, 
"new_feeds": { 
    "exchange": "new_feeds", 
    "exchange_type": "direct", 
    "binding_key": "new_feeds" 
}, 
} 
+0

Это работало. Спасибо. –

Смежные вопросы