2016-08-24 3 views
0

У меня есть задача Луиджи, что requires подзадача. Подзадача зависит от параметров, переданных родительской задачей (т. Е. Той, которая выполняет команду require). Я знаю, что вы можете указать параметр, который подзадачи можно использовать, установив ...Как передать несколько аргументов в подзадачу Luigi?

def requires(self): 
    return subTask(some_parameter) 

... потом на подзадачи, получающей параметр путем установки ...

x = luigi.Parameter() 

Это появляется только чтобы вы могли пройти через один параметр. Каков наилучший способ для отправки через произвольное количество параметров любых типов, которые я хочу? На самом деле я хочу что-то вроде этого:

class parentTask(luigi.Task): 

    def requires(self): 
     return subTask({'file_postfix': 'foo', 
         'file_content': 'bar' 
     }) 

    def run(self): 
     return 


class subTask(luigi.Task): 
    params = luigi.DictParameter() 

    def output(self): 
     return luigi.LocalTarget("file_{}.csv".format(self.params['file_postfix'])) 

    def run(self): 
     with self.output().open('w') as f: 
      f.write(self.params['file_content']) 

Как вы можете видеть, что я попытался с помощью luigi.DictParameter вместо прямой luigi.Parameter, но я получаю TypeError: unhashable type: 'dict' где-то глубоко внутри Луиджи, когда я бегу выше.

Запуск Python 2.7.11, Луиджи 2.1.1

ответ

2

Каков наилучший способ для отправки через произвольное количество параметров любых типов, которые я хочу?

Лучше использовать именованные параметры, например,

#in requires 
return MySampleSubTask(x=local_x, y=local_y) 

class MySampleSubTask(luigi.Task): 
    x = luigi.Parameter() 
    y = luigi.Parameter() 
0

Хорошо, так что я нашел, что это работает, как ожидалось в питона 3.5 (и проблема все еще существует в 3.4).

У нас нет времени, чтобы добраться до сегодняшнего дня, поэтому никаких дополнительных данных.

0

Каков наилучший способ отправить через произвольное число параметров, из любого типа я хочу?

Вы могли бы следовать этому примеру. у вас будет один заполнитель для определения всех параметров, необходимых для прохождения (ParameterCollector). Это позволит избежать определения параметров для каждой задачи sincgle, если вам нужно передать параметр подзадачи в случае многих подзадач.

class ParameterCollector(object): 
    param1 = luigi.Parameter() 
    param2 = luigi.Parameter() 

    def collect_params(self): 
     return {'param1': self.param1, 'param2': self.param2} 


class TaskB(ParameterCollector, luigi.Task): 
    def requires(self): 
     return [] 

    def output(self): 
     return luigi.LocalTarget('/tmp/task1_success') 

    def run(self): 
     with self.output().open('w') as f: 
      f.write(self.param1) 


class TaskA(ParameterCollector, luigi.Task): 
    def requires(self): 
     a = TaskB(**self.collect_params()) 
     print(a) 
     return a 

    def output(self): 
     return luigi.LocalTarget('/tmp/task2_success') 

    def run(self): 
     with self.output().open('w') as f: 
      f.write(str([self.param1, self.param2])) 


if __name__ == '__main__': 
    luigi.run() 
Смежные вопросы