2016-02-22 3 views
8

Я хочу проанализировать столбцы даты в DataFrame, и для каждого столбца даты разрешение на дату может измениться (то есть 2011/01/10 => 2011/01, если для разрешения установлено значение «Месяц»).Как передать дополнительные параметры UDF в SparkSql?

Я написал следующий код:

def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame = 
{ 
    import org.apache.spark.sql.functions._ 
    val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)} 
    val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)} 

    val allColNames = dataframe.columns 
    val allCols = allColNames.map(name => dataframe.col(name)) 

    val mappedCols = 
    { 
    for(i <- allCols.indices) yield 
    { 
     schema(i) match 
     { 
     case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i))) 
     case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i)) 
     case _ => allCols(i) 
     } 
    } 
    } 

    dataframe.select(mappedCols:_*) 

}} 

Однако это не работает. Кажется, что я могу передавать только Column s в UDF. И интересно, будет ли это очень медленно, если я конвертирую DataFrame в RDD и применяю функцию к каждой строке.

Кто-нибудь знает правильное решение? Спасибо!

ответ

25

Просто используйте немного выделки:

def convertDateFunc(resolution: DateResolutionType) = udf((x:String) => 
    SparkDateTimeConverter.convertDate(x, resolution)) 

и использовать его следующим образом:

case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i)) 

На стороне записки вы должны смотреть на sql.functions.trunc и sql.functions.date_format. Они должны по крайней мере частично работать без использования UDF.

Примечание:

В Спарк 2.2 или более поздней версии вы можете использовать typedLit функция:

import org.apache.spark.sql.functions.typedLit 

, которые поддерживают более широкий диапазон литералов как Seq или Map.

+1

Спасибо за ваш ответ и интуицию выделки! – DarkZero

+4

Я написал учебник о том, как использовать currying для создания Spark UDF, который принимает дополнительные параметры во время вызова. https://gist.github.com/andrearota/5910b5c5ac65845f23856b2415474c38 –

10

Вы можете создать литерал Column перейти к UDF с помощью функции lit(...), определенной в org.apache.spark.sql.functions

Например:

val takeRight = udf((s: String, i: Int) => s.takeRight(i)) 
df.select(takeRight($"stringCol", lit(1))) 
+1

Спасибо, я изначально использовал 'lit', но, оказывается, его производительность не так хороша, как другой ответ ... – DarkZero

Смежные вопросы