2014-11-09 2 views

ответ

1

Я не могу придумать способ, но для

  1. Получить все выполняющиеся и запланированные задачи по celery inspect

  2. перебирать их, чтобы увидеть, если ваша задача есть.

проверка this SO вопрос, как первый пункт делается.

удачи

1

Вы можете сделать вашу работу в курс других задач по какому-то запоминанию. Если вы используете ключ управления кешем (redis, memcached,/tmp, все, что удобно), вы можете заставить выполнение зависеть от этого ключа. Я использую redis в качестве примера.

from redis import Redis 

@app.task 
def run_only_one_instance(params): 
    try: 
     sentinel = Redis().incr("run_only_one_instance_sentinel") 
     if sentinel == 1: 
      #I am the legitimate running task 
      perform_task() 
     else: 
      #Do you want to do something else on task duplicate? 
      pass 
     Redis().decr("run_only_one_instance_sentinel") 
    except Exception as e: 
     Redis().decr("run_only_one_instance_sentinel") 
     # potentially log error with Sentry? 
     # decrement the counter to insure tasks can run 
     # or: raise e 
+0

Это предотвратит запуск задачи, если одна и та же задача * все еще работает *. Если такая же задача присутствует 3 раза в очереди, она будет выполняться три раза до тех пор, пока они не выполняются одновременно. – Ramast

+0

incr и decr являются атомарными, поэтому вы ошибаетесь. –

+1

Не меняет того факта, что мое предложение не соответствует вопросу (по крайней мере, больше). Я думаю, что вопрос прошел через изменения, так как я ответил. Принятый ответ тяжелый, но будет делать то, что задается. –

0

Я не знаю, что это собирается помочь вам больше, чем другие ответы, но там идет мой подход, следуя той же идее дается SRJ. Мне нужен был способ заблокировать мой сервер для запуска задач с одинаковым идентификатором для очереди. Поэтому я сделал общую функцию, чтобы помочь мне.

def is_task_active_or_registered(app, task_id): 

    i = app.control.inspect() 

    active_dict = i.active() 
    scheduled_dict = i.scheduled() 
    keys_set = set(active_dict.keys() + scheduled_dict.keys()) 
    tasks_ids_set = set() 

    for _dict in [active_dict, scheduled_dict]: 
     for k in keys_set: 
      for task in _dict[k]: 
       tasks_ids_set.add(task['id']) 

    if task_id in tasks_ids_set: 
     return True 
    else: 
     return False 

Итак, я использую его так:

В контексте, где мой объект сельдерей-приложение доступно, я определяю:

def check_task_can_not_run(task_id): 
    return is_task_active_or_registered(app=celery, task_id=task_id) 

И так, с моей просьбе клиента, я вызовите это check_task_can_not_run(...) и заблокируйте задачу от запуска в случае True.

Смежные вопросы