2017-01-24 2 views
0

Я хочу создать собственный трансформатор функций в DataFrame, чтобы добавить столбец, который представляет собой, например, разницу между двумя другими столбцами. Я следовал this question, но трансформатор там работает только на одном столбце. pyspark.ml.Transformer принимает строку в качестве аргумента для inputCol, поэтому, конечно, я не могу указать несколько столбцов.Трансформатор, работающий с несколькими функциями в pyspark.ml

Поэтому в основном то, что я хочу достичь, это _transform() метод, который напоминает это:

def _transform(self, dataset): 
    out_col = self.getOutputCol() 
    in_col = dataset.select([self.getInputCol()]) 

    # Define transformer logic 
    def f(col1, col2): 
     return col1 - col2 
    t = IntegerType() 

    return dataset.withColumn(out_col, udf(f, t)(in_col)) 

Как это возможно сделать?

+0

Не может быть, что 'HasInputCols' (множественное число) является тем, что вы ищете? –

ответ

1

Мне удалось решить проблему, сначала создав Vector из набора функций, над которыми я хочу работать, а затем применив преобразование к вновь созданной векторной функции. Ниже приведен пример кода, как сделать новую функцию, которая является другой из двух других функций:

class MeasurementDifferenceTransformer(Transformer, HasInputCol, HasOutputCol): 

    @keyword_only 
    def __init__(self, inputCol=None, outputCol=None): 
     super(MeasurementDifferenceTransformer, self).__init__() 
     kwargs = self.__init__._input_kwargs 
     self.setParams(**kwargs) 

    @keyword_only 
    def setParams(self, inputCol=None, outputCol=None): 
     kwargs = self.setParams._input_kwargs 
     return self._set(**kwargs) 

    def _transform(self, dataset): 
     out_col = self.getOutputCol() 
     in_col = dataset[self.getInputCol()] 

     # Define transformer logic 
     def f(vector): 
      return float(vector[0] - vector[1]) 
     t = FloatType() 

     return dataset.withColumn(out_col, udf(lambda x: f(x), t)(in_col)) 

Чтобы использовать его, мы первый экземпляр VectorAssembler создать вектор признаков:

pair_assembler = VectorAssembler(inputCols=["col1", "col2"], outputCol="cols_vector") 

Затем мы создаем трансформатор:

pair_transformer = MeasurementDifferenceTransformer(inputCol="cols_vector", outputCol="col1_minus_col2") 

Наконец преобразуют данные:

pairfeats = pair_assembler.transform(df) 
difffeats = pait_transformer.transform(pairfeats)