У меня есть задача celery, которая обрабатывает каждую строку в супербольшем текстовом файле параллельно. У меня также есть задача сельдерея, которая должна запускаться после обработки каждой строки - она объединяет и обрабатывает выходные данные каждой строки. Поскольку это такие огромные массивы данных, с которыми я работаю, есть ли способ, которым я могу работать с сельдереем с генераторами, в отличие от списков?Как передать генератор в celery.chord вместо списка?
def main():
header_generator = (processe.s(line) for line in file)
callback = finalize.s()
# Want to loop through header_generator and kick off tasks
chord(header_generator)(callback)
@celery.task
def process(line):
# do stuff with line, return output
return output
@celery.task
def finalize(output_generator):
# Want to loop through output_generator and process the output
for line in output_generator:
# do stuff with output
# do something to signal the completion of the file
Если это невозможно - без использования дерева сельдерея - есть ли другая стратегия, которую кто-то может порекомендовать?
Есть ли [это сообщение] (http://stackoverflow.com/questions/17052291/reporting-yielded-results-of-long-running-celery-task) help? –