2015-06-09 1 views
2

У меня есть задача celery, которая обрабатывает каждую строку в супербольшем текстовом файле параллельно. У меня также есть задача сельдерея, которая должна запускаться после обработки каждой строки - она ​​объединяет и обрабатывает выходные данные каждой строки. Поскольку это такие огромные массивы данных, с которыми я работаю, есть ли способ, которым я могу работать с сельдереем с генераторами, в отличие от списков?Как передать генератор в celery.chord вместо списка?

def main(): 
    header_generator = (processe.s(line) for line in file) 
    callback = finalize.s() 
    # Want to loop through header_generator and kick off tasks 
    chord(header_generator)(callback) 

@celery.task 
def process(line): 
    # do stuff with line, return output 
    return output 

@celery.task 
def finalize(output_generator): 
    # Want to loop through output_generator and process the output 
    for line in output_generator: 
     # do stuff with output 
    # do something to signal the completion of the file 

Если это невозможно - без использования дерева сельдерея - есть ли другая стратегия, которую кто-то может порекомендовать?

+0

Есть ли [это сообщение] (http://stackoverflow.com/questions/17052291/reporting-yielded-results-of-long-running-celery-task) help? –

ответ

0

На момент написания этой статьи генераторы передавались группам, и аккорды сразу расширялись. У меня была аналогичная проблема, поэтому я добавил ее поддержку и создал запрос на тягу против celery 3.x: https://github.com/celery/celery/pull/3043

В настоящее время поддерживается только redis. Надеемся, что PR будет слит до того, как будет выпущен сельдерей 3.

Смежные вопросы