2016-12-13 3 views
5

Я использую Celery 4.0.1 с Django 1.10, и у меня есть задачи планирования задач (выполнение задачи работает нормально). Вот конфигурация сельдерея:Динамическое использование периодических задач в сельдерее (celerybeat) с использованием add_periodic_task

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings') 
app = Celery('myapp') 

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) 

app.conf.BROKER_URL = 'amqp://{}:{}@{}'.format(settings.AMQP_USER, settings.AMQP_PASSWORD, settings.AMQP_HOST) 
app.conf.CELERY_DEFAULT_EXCHANGE = 'myapp.celery' 
app.conf.CELERY_DEFAULT_QUEUE = 'myapp.celery_default' 
app.conf.CELERY_TASK_SERIALIZER = 'json' 
app.conf.CELERY_ACCEPT_CONTENT = ['json'] 
app.conf.CELERY_IGNORE_RESULT = True 
app.conf.CELERY_DISABLE_RATE_LIMITS = True 
app.conf.BROKER_POOL_LIMIT = 2 

app.conf.CELERY_QUEUES = (
    Queue('myapp.celery_default'), 
    Queue('myapp.queue1'), 
    Queue('myapp.queue2'), 
    Queue('myapp.queue3'), 
) 

Тогда в tasks.py у меня есть:

@app.task(queue='myapp.queue1') 
def my_task(some_id): 
    print("Doing something with", some_id) 

В views.py я хочу запланировать эту задачу:

def my_view(request, id): 
    app.add_periodic_task(10, my_task.s(id)) 

Тогда я исполняю команды:

sudo systemctl start rabbitmq.service 
celery -A myapp.celery_app beat -l debug 
celery worker -A myapp.celery_app 

Но задача никогда не планировалось. Я ничего не вижу в журналах. Задача работает, потому что, если, на мой взгляд, я:

def my_view(request, id): 
    my_task.delay(id) 

Задача выполнена.

Если в моем файле конфигурации, если я запланировать задачу вручную, как это работает:

app.conf.CELERYBEAT_SCHEDULE = { 
    'add-every-30-seconds': { 
     'task': 'tasks.my_task', 
     'schedule': 10.0, 
     'args': (66,) 
    }, 
} 

Я просто не могу запланировать задачу динамически. Есть идеи?

ответ

14

На самом деле вы не можете не определить периодическую задачу на уровне представления, поскольку настройка расписания биений будет загружена первой и не может быть перенесена во время выполнения:

add_periodic_task() функция добавит запись к beat_schedule опускалось за кулисами, и тот же параметр может также может быть использован для создания периодических задач вручную:

app.conf.CELERYBEAT_SCHEDULE = { 
    'add-every-30-seconds': { 
     'task': 'tasks.my_task', 
     'schedule': 10.0, 
     'args': (66,) 
    }, 
} 

что означает, если вы хотите использовать add_periodic_task() он должен быть обернут withi п on_after_configure обработчик на уровне сельдерей приложения и любые изменения во время выполнения не вступит в силу:

app = Celery() 

@app.on_after_configure.connect 
def setup_periodic_tasks(sender, **kwargs): 
    sender.add_periodic_task(10, my_task.s(66)) 

Как уже упоминалось в doc регулярное celerybeat просто следить за выполнением задач:

Значение по умолчанию scheduler - это celery.beat.PersistentScheduler, который просто отслеживает последнее время выполнения в файле базы данных локальной полки.

Для того, чтобы иметь возможность динамически управлять периодическими задачами и перепланировать celerybeat во время выполнения:

Там также django-celery-beat расширения, которое сохраняет график в базе данных Django, и представляет собой удобный интерфейс администратора в управлять периодическими задачами во время выполнения.

Задачи будут сохраняться в базе данных django, и планировщик может быть обновлен в модели задач на уровне db.Всякий раз, когда вы обновляете периодическую задачу, счетчик в этой таблице задач будет увеличиваться и сообщает службе билда обрезания, чтобы перезагрузить расписание из базы данных.

Возможным решением для вас может быть следующим:

from django_celery_beat.models import PeriodicTask, IntervalSchedule 

schedule= IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS) 
task = PeriodicTask.objects.create(interval=schedule, name='any name', task='tasks.my_task', args=json.dumps([66])) 

views.py

def update_task_view(request, id) 
    task = PeriodicTask.objects.get(name="task name") # if we suppose names are unique 
    task.args=json.dumps([id]) 
    task.save() 


EDIT: (13/01/2018)


последняя release 4.1.0 обратились предмет в этом ticket #3958 и были объединены

+0

Подобрав на ваш комментарий «На самом деле вы не можете не определить периодическую задачу на уровне представления»: Можно ли используйте 'add_periodic_task()' на уровне приложения, то есть в 'task.py'? Кажется, лучше инкапсулировать, чтобы эти периодические задачи были объявлены в приложении. –

+0

На самом деле вовсе не нужно его использовать, поскольку он будет вызываться для вас, если вы просто используете синтаксис настроек 'app.conf.CELERYBEAT_SCHEDULE', но если вы хотите использовать его явно, вы можете использовать его в' task.py' файл. – DhiaTN

+2

Я считаю, что последняя версия (после версии 4.1.0) должна иметь этот адрес. Вот разработчик, который работает [# 3958] (https://github.com/celery/celery/pull/3958) –

Смежные вопросы