2014-10-27 5 views
1

Я использую сельдерей в проекте, где я использую его как планировщик (как периодическую задачу).Как настроить задачу сельдерея

Моя сельдаяся задача выглядит следующим образом:

@periodic_task(run_every=timedelta(seconds=300)) 
def update_all_feed(): 
    feed_1() 
    feed_2() 
    ........... 
    feed_n() 

Но поскольку число каналов увеличивается, оно занимает много времени, чтобы добраться до других каналов (например, когда Сельдерей работает с номером подачи п занимает много времени .., чтобы добраться до следующего кормления (п + 1) Я хочу, чтобы использовать параллелизм сельдерея для запуска нескольких каналов

После прохождения документации, я нашел, что я могу назвать задачу сельдерея, как показано ниже:

feed.delay() 

Как я могу настроить сельдерей так, чтобы он получал все кормовые ids и собирал их (например, например, по 5 каналов за раз)? Я понимаю, что для этого я должен буду использовать Сельдерей в качестве демона.

N.B: Я использую mongodb в качестве брокера, все, что я сделал, это установить его и добавить URL-адрес в конфигурацию сельдерея.

ответ

0

Вы можете планировать все свои каналы, как этот

@periodic_task(run_every=timedelta(seconds=300)) 
def update_all_feed(): 
    feed_1.delay() 
    feed_2.delay() 
    ....... 
    feed_n.delay() 

или вы можете использовать группу так упрощать

from celery import group 
@periodic_task(run_every=timedelta(seconds=300)) 
def update_all_feed(): 
    group(feed.delay(i) for i in range(10)) 

Теперь для запуска задач, которые вы можете создать рабочий для выполнения задач

celery worker -A your_app -l info --beat 

Это начинает выполнение вашей задачи каждые пять минут. Однако параллелизм по умолчанию равен ядрам вашего процессора. Вы также можете изменить параллелизм. Если вы хотите выполнить 10 заданий в то время, одновременно затем

celery worker -A your_app -l info --beat -c 10 
0

От Celery documentation:

from celery.task.sets import TaskSet 
from .tasks import feed, get_feed_ids 

job = TaskSet(tasks=[ 
     feed.subtask((feed_id,)) for feed_id in get_feed_ids() 
    ]) 

result = job.apply_async() 
results = result.join() # There's more in the documentation 
Смежные вопросы