2016-11-15 3 views
2

Я хочу написать обычай Transformer для трубопровода в искровом 2.0 в scala. Пока мне не совсем ясно, как должны возвращаться методы copy или transformSchema. Правильно ли, что они возвращают null? https://github.com/SupunS/play-ground/blob/master/test.spark.client_2/src/main/java/CustomTransformer.java для копирования?Как написать собственный трансформатор в MLlib?

Как Transformer extends PipelineStage Я заключаю, что a fit называет метод transformSchema. Правильно ли я понимаю, что transformSchema похож на sk-learns?

Как мой Transformer должен присоединиться к набору данных с (очень маленьким) вторым набором данных. Я хочу сохранить этот вариант в сериализованном конвейере. Как я должен хранить это в трансформаторе для правильной работы с механизмом сериализации конвейеров?

Как бы выглядел простой трансформатор, который вычисляет среднее значение для одного столбца и заполняет значения nan + сохраняется это значение?

@SerialVersionUID(serialVersionUID) // TODO store ibanList in copy + persist 
    class Preprocessor2(someValue: Dataset[SomeOtherValues]) extends Transformer { 

     def transform(df: Dataset[MyClass]): DataFrame = { 

     } 

     override def copy(extra: ParamMap): Transformer = { 
     } 

     override def transformSchema(schema: StructType): StructType = { 
     schema 
     } 
    } 

ответ

2

transformSchema должен возвращать схему, которая, как ожидается, после применения Transformer. Пример:

  • Если трансформаторные добавляет столбец IntegerType, и имя выходного столбца foo:

    import org.apache.spark.sql.types._ 
    
    override def transformSchema(schema: StructType): StructType = { 
        schema.add(StructField("foo", IntegerType)) 
    } 
    

Итак, если схема не изменяется для набора данных, как только значение имени заполняется для среднего вменения. Я должен вернуть исходный класс case в качестве схемы?

Это не представляется возможным в Спарк SQL (и MLlib тоже), т.к. Dataset является неизменны после создания. Вы можете добавлять или «заменять» (что добавляется, а затем drop операций).

+0

вы имеете в виду трансформатор, который падает/добавляет новые столбцы должны быть оценщик? Это звучит странно. Поэтому я понимаю, что правильный трансформатор sklearn с fit & transform является оценкой искры, искровой трансформатор может выполнять только «фиксированные» преобразования, которые являются постоянными для любых входных данных. Поскольку такой средний компьютер должен быть оценщиком? –

+0

Как @LostInOverflow сказал, что вам нужна оценка, а затем трансформатор - где вычисление вычисляет среднее значение из исходного столбца, а затем трансформатор вменяет недостающие значения с вычисленным средним значением. Кроме того, недостающее значение вменения является функцией, которая в настоящее время находится в кулинарии - [JIRA] (https://issues.apache.org/jira/browse/SPARK-13568) – ShirishT

+0

Спасибо. Правильно ли, что любая переменная в поправке оценщика, которая должна храниться/сохраняться, на самом деле будет сохраняться? –

2

Прежде всего, я не уверен, что вы хотите Transformer сами по себе (или UnaryTransformer, как @LostInOverflow suggested in the answer), как вы сказали:

Как бы простой вид трансформатора, как, который вычисляет среднее значение для одного столбца и заполняет значения nan + сохраняется это значение?

Для меня это похоже на то, что вы хотите применить агрегатную функцию (aka aggregation) и «соединить» ее со всеми столбцами для получения конечного значения или NaN.

Это выглядит как вы хотите groupBy сделать агрегацию для mean, а затем join, которые могут быть агрегацией окна тоже.

Во всяком случае, я хотел бы начать с UnaryTransformer, который позволит решить первую проблему в вашем вопросе:

До сих пор не совсем понятно, для меня то, что copy или transformSchema методы должны вернуться. Правильно ли, что они возвращают нуль?

См the complete project spark-mllib-custom-transformer at GitHub, в котором я реализовал столбец UnaryTransformer на toUpperCase строку, которая для UnaryTransformer выглядит следующим образом:

import org.apache.spark.ml.UnaryTransformer 
import org.apache.spark.ml.util.Identifiable 
import org.apache.spark.sql.types.{DataType, StringType} 

class UpperTransformer(override val uid: String) 
    extends UnaryTransformer[String, String, UpperTransformer] { 

    def this() = this(Identifiable.randomUID("upp")) 

    override protected def createTransformFunc: String => String = { 
    _.toUpperCase 
    } 

    override protected def outputDataType: DataType = StringType 
} 
Смежные вопросы