2017-01-13 3 views
1

Я несколько задач, что-то вроде этого:сельдерей задача же график с офсетной

CELERYBEAT_SCHEDULE = { 
    'task1': { 
     'task': 'api.tasks.task1', 
     'schedule': timedelta(seconds=10), 
    }, 
    'task2': { 
     'task': 'api.tasks.task2', 
     'schedule': timedelta(seconds=30), 
    }, 
    'task3': { 
     'task': 'api.tasks.task3', 
     'schedule': timedelta(seconds=15), 
    }, 
    ... 
} 

Так, task1 будет работать в *: *: 10 *: *: 20 *: *: 30, *: *: 40 *: *: 50 и *: *: 00

task2 будет работать в *: *: 30 и *: *: 00

Task3 будет работать в *: *: 15, *: *: 30, *: *: 45 и *: *: 00

Тогда задачи всегда согласуются *: *: 30 и *: *: 00. Есть ли способ добавить смещение. Я хочу получить что-то вроде этого:

task1 (offset = 2) run in *: *: 12, *: *: 22, *: *: 32, *: *: 42, *: *: 52 и *: *: 02

task2 (смещение = 7) выполняются в *: *: 37 и *: *: 07

Task3 (смещение = 0) выполняются в *: *: 15, *: *: 30, *: *: 45 и *: *: 00

Я прочитал документацию, и я думаю, что я должен использовать crontab, но разве нет другого способа более приятного? И кронтаб не имеет для конфигурации секунд :-(

+1

Я реализовал один раз [schedulesince] (https://gist.github.com/glowka/a3936877cb754d30e3ff753e1b94d67b) ('timedelta (секунды = 10) 'является обычно преобразуется в 'schedule (timedelta (seconds = 10))'). Это не совсем то, о чем вы просили, но, возможно, это помогает. – glowka

+0

Спасибо @glowka Я могу использовать его как пример ... но да, это не решает мою проблему :-( – Goin

+0

На ваш вопрос? –

ответ

1

Вы можете решить эту проблему, используя следующие шаги:

1.You не нужно добавлять CELERYBEAT_SCHEDULE на файл settings.py

2.In __init__.py файл в api приложение добавить ниже код:

import tasks 

3.And затем на tasks.py файла:

from datetime import datetime 

from celery import Celery 


app = Celery() 
run_id = None 

@app.task 
def task1(): 
    print('every 10 seconds:', datetime.now().second) 

@app.task 
def task2(): 
    print('every 30 seconds:', datetime.now().second) 

@app.task 
def task3(): 
    print('every 15 seconds:', datetime.now().second) 

@app.task 
def run(sender): 
    global app, run_id 
    # Schedule other tasks 
    sender.add_periodic_task(10.0, task1.s()) 
    sender.add_periodic_task(30.0, task2.s()) 
    sender.add_periodic_task(15.0, task3.s()) 
    # Stop self running later times 
    app.control.revoke(run_id) 

@app.on_after_configure.connect 
def setup_periodic_tasks(sender, **kwargs): 
    global run_id 
    now = datetime.now() 
    run_id = sender.add_periodic_task((30 if now.second < 30 else 60) - now.second, run.s(sender)) 
0

По celery documentation:

Вы также можете определить свои собственные типы графика, расширяя интерфейс schedule .

Так вот мое решение:

from datetime import timedelta 

from celery import Celery 
from celery.schedules import schedule 


class MySchedule(schedule): 
    def __init__(self, run_every=None, offset=None): 
     self._run_every = run_every 
     self._offset = offset if offset is not None else timedelta(seconds=0) 
     self._do_offset = True if self._offset else False 
     super(MySchedule, self).__init__(
      run_every=self._run_every + self._offset) 

    def is_due(self, last_run_at): 
     ret = super(MySchedule, self).is_due(last_run_at) 
     if self._do_offset and ret.is_due: 
      self._do_offset = False 
      self.run_every = self._run_every 
      ret = super(MySchedule, self).is_due(last_run_at) 
     return ret 

    def __reduce__(self): 
     return self.__class__, (self._run_every, self._offset) 


app = Celery('tasks', broker='pyamqp://[email protected]//') 

app.conf.beat_schedule = { 
    'task1': { 
     'task': 'tasks.task1', 
     'schedule': MySchedule(
      run_every=timedelta(seconds=10), offset=timedelta(seconds=2)), 
    }, 
    'task2': { 
     'task': 'tasks.task2', 
     'schedule': MySchedule(
      run_every=timedelta(seconds=30), offset=timedelta(seconds=7)), 
    }, 
    'task3': { 
     'task': 'tasks.task3', 
     'schedule': MySchedule(
      run_every=timedelta(seconds=15), offset=timedelta(seconds=0)), 
    }, 
} 


@app.task 
def task1(): 
    print('task1') 


@app.task 
def task2(): 
    print('task2') 


@app.task 
def task3(): 
    print('task3') 

Вы можете написать свой собственный MySchedule и продлить его от BaseSchedule иметь больше контроля.

-1

Я попытался решить эту проблему с решением, немного отличающимся от решения vasi1y. Но это решение и предыдущие решения не работают ...

class schedule_offset(schedule): 

    def __init__(self, run_every=None, offset=None, 
       relative=False, nowfun=None, app=None): 
     self._run_every = run_every 
     if offset is None: 
      offset = 0 
     self._offset = maybe_timedelta(offset) 
     self._executing = 0 
     super(schedule_offset, self).__init__(
      run_every=self._run_every, relative=relative, nowfun=nowfun, app=app) 

    def is_due(self, last_run_at): 
     last_run_at = last_run_at + self._offset 
     last_run_at = self.maybe_make_aware(last_run_at) 
     rem_delta = self.remaining_estimate(last_run_at) 
     remaining_s = timedelta_seconds(rem_delta) 
     if remaining_s == 0: 
      ret = schedstate(is_due=True, next=self.seconds + self._offset.seconds) 
      if self._executing < 2: 
       self._executing += 1 
       if self._executing == 2: 
        self._offset = maybe_timedelta(0) 
      return ret 
     return schedstate(is_due=False, next=remaining_s) 

    def __reduce__(self): 
     return self.__class__, (self._run_every, self._offset, self.relative, self.nowfun) 


CELERYBEAT_SCHEDULE = { 
    'task1': { 
     'task': 'api.tasks.task1', 
     'schedule': schedule_offset(timedelta(seconds=10), offset=2), 
    }, 
    'task2': { 
     'task': 'api.tasks.task2', 
     'schedule': schedule_offset(timedelta(seconds=30), offset=7), 
    }, 
    'task3': { 
     'task': 'api.tasks.task3', 
     'schedule': timedelta(seconds=15), 
    }, 
    ... 
} 
Смежные вопросы