2017-02-15 5 views
3

Я хотел бы написать функцию yeardiff, которая работает аналогично datediff. yeardiff должен принимать два аргумента Column и возвращать столбец с количеством лет между этими параметрами Columns.Написание функций Spark, которые принимают аргументы столбца и возвращают столбец

Давайте использовать следующий образец данных:

val testDf = Seq(
    ("2016-09-10", "2001-08-10"), 
    ("2016-04-18", "2010-05-18"), 
    ("2016-01-10", "2013-08-10") 
) 
    .toDF("first_datetime", "second_datetime") 
    .withColumn("first_datetime", $"first_datetime".cast("timestamp")) 
    .withColumn("second_datetime", $"second_datetime".cast("timestamp")) 

Мы можем запустить это, чтобы получить разницу в дате:

testDf.withColumn("num_days", datediff(col("first_datetime"), col("second_datetime"))) 

Я хочу, чтобы иметь возможность управлять этим:

testDf.withColumn("num_years", yeardiff(col("first_datetime"), col("second_datetime"))) 

Я попытался определить функцию yeardiff с необходимой сигнатурой метода и не получил никуда:

def yeardiff(end: Column, start: Column): Column = { 
    // what do I do here 
}  

Вот взломанный решение преобразование, которое я придумал и не нравится:

def yearDiff(end: String, start: String)(df: DataFrame): DataFrame = { 
    val c = s"${end}_${start}_datediff" 
    df 
    .withColumn(c, datediff(col(end), col(start))) 
    .withColumn("yeardiff", col(c)/365) 
} 

EDIT

Я начал копаться в исходном коде Спарк, чтобы увидеть, как datediff работы. Вот the datediff function definition:

def datediff(end: Column, start: Column): Column = withExpr { DateDiff(end.expr, start.expr) } 

Здесь the DateDiff case class:

case class DateDiff(endDate: Expression, startDate: Expression) 
    extends BinaryExpression with ImplicitCastInputTypes { 

    override def left: Expression = endDate 
    override def right: Expression = startDate 
    override def inputTypes: Seq[AbstractDataType] = Seq(DateType, DateType) 
    override def dataType: DataType = IntegerType 

    override def nullSafeEval(end: Any, start: Any): Any = { 
    end.asInstanceOf[Int] - start.asInstanceOf[Int] 
    } 

    override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { 
    defineCodeGen(ctx, ev, (end, start) => s"$end - $start") 
    } 
} 

ответ

2

Это может решить вашу проблему:

def yearDiff(end: Column, start: Column): Column = { 
    datediff(end, start)/365 
} 
+0

Это хорошая работа, но вам нужно изменить порядок аргументов на 'lateiff (end, start)/365'. – Powers

0

Мы можем использовать встроенный в год функции и UDF для корректировки случаев где месяц не прошел.

  def yeardiff(end: Column, start: Column): Column = { 
       def getAdjustment(monthStart : Int, monthEnd : Int, dayStart : Int, dayEnd : Int) : Int = { 
       if (monthEnd>monthStart) return -1 
       if (monthStart==monthEnd && dayEnd > dayStart) return -1 
       else return 0 
       } 
       val udfGetAdjustment = udf[Int,Int,Int,Int,Int](getAdjustment) 
       val adj = udfGetAdjustment(month(start),month(end),dayofmonth(start),dayofmonth(end)) 
       year(end) - year(start) + adj 
      } 
Смежные вопросы