У меня есть количество файлов pickle, по одному на каждую дату между 2005 и 2010 годами. Каждый файл содержит словарь слов со своими соответствующими частотами для этой даты. У меня также есть «главный файл» со всеми уникальными словами за весь период. Всего около 5 миллионов слов.Архитектура для задач luigi с несколькими входами
Мне нужно взять все эти данные и создать один CSV-файл на каждое слово, которое будет содержать одну строку на дату. НАПРИМЕР, например файл some_word.txt
:
2005-01-01,0.0003
2005-01-02,0.00034
2005-01-03,0.008
У меня возникли проблемы по организации этого процесса с каркасом Luigi. Моя текущая задача верхнего уровня берет слово, просматривает связанную с ним частоту для каждой даты и сохраняет результат в файле CSV. Думаю, я мог бы просто пропустить каждое слово в моем основном файле и запустить задачу с этим словом, но, по моим оценкам, потребуется несколько месяцев, если не больше. Вот моя задача верхнего уровня AggregateTokenFreqs
в упрощенной версии.
class AggregateTokenFreqs(luigi.Task):
word = luigi.Parameter()
def requires(self):
pass # not sure what to require here, master file?
def output(self):
return luigi.LocalTarget('data/{}.csv'.format(self.word))
def run(self):
results = []
for date_ in some_list_of_dates:
with open('pickles/{}.p'.format(date_), 'rb') as f:
freqs = pickle.load(f)
results.append((date_, freqs.get(self.word))
# Write results list to output CSV file
Какова текущая обработка, которую вам нужно сделать? Например, ваш план повторного запуска ежедневного процесса, когда поступают данные для нового дня? Если вам нужно только запустить его один раз, вероятно, нет смысла запускать luigi. В любом случае вам будет лучше использовать многопроцессорность. – MattMcKnight