2016-04-22 2 views
10

я получаю org.apache.spark.SparkException: Task not serializable, когда я пытаюсь выполнить следующие действия на Спарк 1.4.1:Spark: Задача не Сериализуемый для UDF на DataFrame

import java.sql.{Date, Timestamp} 
import java.text.SimpleDateFormat 

object ConversionUtils { 
    val iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX") 

    def tsUTC(s: String): Timestamp = new Timestamp(iso8601.parse(s).getTime) 

    val castTS = udf[Timestamp, String](tsUTC _) 
} 

val df = frame.withColumn("ts", ConversionUtils.castTS(frame("ts_str"))) 
df.first 

Здесь frame является DataFrame, который живет в HiveContext. В этом фрейме данных нет никаких проблем.

У меня есть аналогичные UDF для целых чисел, и они работают без проблем. Тем не менее, проблема со временными метками создает проблемы. Согласно documentation, java.sql.TimeStamp реализует Serializable, так что это не проблема. То же самое верно для SimpleDateFormat как можно видеть here.

Это заставляет меня поверить, что это проблема UDF. Однако я не уверен, что и как это исправить.

Соответствующий раздел следа:

Caused by: java.io.NotSerializableException: ... 
Serialization stack: 
     - object not serializable (class: ..., value: [email protected]) 
     - field (class: ...$ConversionUtils$$anonfun$3, name: $outer, type: class ...$ConversionUtils$) 
     - object (class ...$ConversionUtils$$anonfun$3, <function1>) 
     - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2, name: func$2, type: interface scala.Function1) 
     - object (class org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2, <function1>) 
     - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUdf, name: f, type: interface scala.Function1) 
     - object (class org.apache.spark.sql.catalyst.expressions.ScalaUdf, scalaUDF(ts_str#2683)) 
     - field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression) 
     - object (class org.apache.spark.sql.catalyst.expressions.Alias, scalaUDF(ts_str#2683) AS ts#7146) 
     - element of array (index: 35) 
     - array (class [Ljava.lang.Object;, size 36) 
     - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) 
     - object (class scala.collection.mutable.ArrayBuffer, 

ответ

14

Try:

object ConversionUtils extends Serializable { 
    ... 
} 
+3

Людей, я чувствую себя так глупо сейчас ... Спасибо! – Ian

+12

Это заставит вас чувствовать себя лучше, когда вы примете мой ответ ';-)' –

+0

Не то чтобы это важно, но мне стало лучше видеть этот вопрос и ответ, размещенный здесь на SO. Проголосовали, спасибо! –

Смежные вопросы