2016-11-28 4 views
1

У меня есть количество файлов pickle, по одному на каждую дату между 2005 и 2010 годами. Каждый файл содержит словарь слов со своими соответствующими частотами для этой даты. У меня также есть «главный файл» со всеми уникальными словами за весь период. Всего около 5 миллионов слов.Архитектура для задач luigi с несколькими входами

Мне нужно взять все эти данные и создать один CSV-файл на каждое слово, которое будет содержать одну строку на дату. НАПРИМЕР, например файл some_word.txt:

2005-01-01,0.0003 
2005-01-02,0.00034 
2005-01-03,0.008 

У меня возникли проблемы по организации этого процесса с каркасом Luigi. Моя текущая задача верхнего уровня берет слово, просматривает связанную с ним частоту для каждой даты и сохраняет результат в файле CSV. Думаю, я мог бы просто пропустить каждое слово в моем основном файле и запустить задачу с этим словом, но, по моим оценкам, потребуется несколько месяцев, если не больше. Вот моя задача верхнего уровня AggregateTokenFreqs в упрощенной версии.

class AggregateTokenFreqs(luigi.Task): 
    word = luigi.Parameter() 

    def requires(self): 
     pass # not sure what to require here, master file? 

    def output(self): 
     return luigi.LocalTarget('data/{}.csv'.format(self.word)) 

    def run(self): 
     results = [] 
     for date_ in some_list_of_dates: 
      with open('pickles/{}.p'.format(date_), 'rb') as f: 
       freqs = pickle.load(f) 
       results.append((date_, freqs.get(self.word)) 

     # Write results list to output CSV file 
+1

Какова текущая обработка, которую вам нужно сделать? Например, ваш план повторного запуска ежедневного процесса, когда поступают данные для нового дня? Если вам нужно только запустить его один раз, вероятно, нет смысла запускать luigi. В любом случае вам будет лучше использовать многопроцессорность. – MattMcKnight

ответ

0

@MattMcKnight говорит, что вам может быть лучше использовать многопроцессорную обработку. Однако, если вы хотите использовать Luigi, вот что вы можете сделать:

  • У Luigi есть концепция рабочих, которые вы настраиваете. Это число локального процесса для параллельной работы другой задачи.
  • Вы можете смоделировать задачу вместо «зацикливания» через все соленые огурцы, передать один мазок в задачу (в качестве параметра). Вам нужно будет записать результат в TSV в каталоге с уникальным именем.
  • У них есть цикл, который создает задачу на каждый рассол (дата). Задайте количество рабочих (т. Е. 5). таким образом, вы сможете обрабатывать 5 файлов одновременно.
  • Вам потребуется дополнительная задача, которая «объединяет» все отдельные файлы CSV в один.

Надеюсь, это поможет.

Смежные вопросы