Я хочу создать следующий поток с использованием конфигурации сельдерея \ API:Как избежать повторяющихся задач в сельдерее брокере
- Отправить Таску (ARGB) Только если очередь сельдерея не имеет Таски (ARGB) уже в ожидании
Возможно ли это? как?
Я хочу создать следующий поток с использованием конфигурации сельдерея \ API:Как избежать повторяющихся задач в сельдерее брокере
Возможно ли это? как?
Я не могу придумать способ, но для
Получить все выполняющиеся и запланированные задачи по celery inspect
перебирать их, чтобы увидеть, если ваша задача есть.
проверка this SO вопрос, как первый пункт делается.
удачи
Вы можете сделать вашу работу в курс других задач по какому-то запоминанию. Если вы используете ключ управления кешем (redis, memcached,/tmp, все, что удобно), вы можете заставить выполнение зависеть от этого ключа. Я использую redis в качестве примера.
from redis import Redis
@app.task
def run_only_one_instance(params):
try:
sentinel = Redis().incr("run_only_one_instance_sentinel")
if sentinel == 1:
#I am the legitimate running task
perform_task()
else:
#Do you want to do something else on task duplicate?
pass
Redis().decr("run_only_one_instance_sentinel")
except Exception as e:
Redis().decr("run_only_one_instance_sentinel")
# potentially log error with Sentry?
# decrement the counter to insure tasks can run
# or: raise e
Я не знаю, что это собирается помочь вам больше, чем другие ответы, но там идет мой подход, следуя той же идее дается SRJ. Мне нужен был способ заблокировать мой сервер для запуска задач с одинаковым идентификатором для очереди. Поэтому я сделал общую функцию, чтобы помочь мне.
def is_task_active_or_registered(app, task_id):
i = app.control.inspect()
active_dict = i.active()
scheduled_dict = i.scheduled()
keys_set = set(active_dict.keys() + scheduled_dict.keys())
tasks_ids_set = set()
for _dict in [active_dict, scheduled_dict]:
for k in keys_set:
for task in _dict[k]:
tasks_ids_set.add(task['id'])
if task_id in tasks_ids_set:
return True
else:
return False
Итак, я использую его так:
В контексте, где мой объект сельдерей-приложение доступно, я определяю:
def check_task_can_not_run(task_id):
return is_task_active_or_registered(app=celery, task_id=task_id)
И так, с моей просьбе клиента, я вызовите это check_task_can_not_run(...)
и заблокируйте задачу от запуска в случае True
.
Это предотвратит запуск задачи, если одна и та же задача * все еще работает *. Если такая же задача присутствует 3 раза в очереди, она будет выполняться три раза до тех пор, пока они не выполняются одновременно. – Ramast
incr и decr являются атомарными, поэтому вы ошибаетесь. –
Не меняет того факта, что мое предложение не соответствует вопросу (по крайней мере, больше). Я думаю, что вопрос прошел через изменения, так как я ответил. Принятый ответ тяжелый, но будет делать то, что задается. –