2016-07-19 3 views
0

Я столкнулся с странные проблемы с Scala/Спарк (1.5) и Цеппелин:Задача не сериализации, используя пользовательский класс dataframe в Спарк Scala

Если я запускаю следующий Scala-код/​​Спарк, он будет работать должным образом:

// TEST NO PROBLEM SERIALIZATION 
val rdd = sc.parallelize(Seq(1, 2, 3)) 
val testList = List[String]("a", "b") 

rdd.map{a => 
    val aa = testList(0) 
    None} 

Однако после объявления пользовательского типа dataframe как предложено here

//DATAFRAME EXTENSION 
import org.apache.spark.sql.DataFrame 

object ExtraDataFrameOperations { 
    implicit class DFWithExtraOperations(df : DataFrame) { 

    //drop several columns 
    def drop(colToDrop:Seq[String]):DataFrame = { 
     var df_temp = df 
     colToDrop.foreach{ case (f: String) => 
      df_temp = df_temp.drop(f)//can be improved with Spark 2.0 
     } 
     df_temp 
    } 
    } 
} 

и использовать его, например, как следующее:

//READ ALL THE FILES INTO different DF and save into map 
import ExtraDataFrameOperations._ 
val filename = "myInput.csv" 

val delimiter = "," 

val colToIgnore = Seq("c_9", "c_10") 

val inputICFfolder = "hdfs:///group/project/TestSpark/" 

val df = sqlContext.read 
      .format("com.databricks.spark.csv") 
      .option("header", "true") // Use first line of all files as header 
      .option("inferSchema", "false") // Automatically infer data types? => no cause we need to merge all df, with potential null values => keep string only 
      .option("delimiter", delimiter) 
      .option("charset", "UTF-8") 
      .load(inputICFfolder + filename) 
      .drop(colToIgnore)//call the customize dataframe 

Этот пробег успешно завершен.

Теперь, если я снова запустить следующий код (то же самое, что и выше)

// TEST NO PROBLEM SERIALIZATION 
val rdd = sc.parallelize(Seq(1, 2, 3)) 
val testList = List[String]("a", "b") 
rdd.map{a => 
    val aa = testList(0) 
    None} 

Я получаю сообщение об ошибке:

рдд: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [8] at Распараллеливать по: 32 testList: List [String] = List (a, b) org.apache.spark.SparkException: Задача не может быть сериализована в org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 304) по адресу org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean (ClosureCleaner.scala: 294) at org.apache.spark.util.ClosureCleaner $ .clean (ClosureCleaner.scala : 122) at org.apache.spark.SparkContext.clean (SparkContext.scala: 2032) at org.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply (RDD.scala: 314) .. Вызвать: java.io.NotSerializableException: $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ ExtraDataFrameOperations $ Сериализация стека: - объект не сериализуется (класс: $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ ExtraDataFrameOperations $, значение: $ iwC $$ iwC $$ iwC $$ iwC $$ IWC IWC $$ $$ $$ IWC IWC IWC $$ $ ExtraDataFrameOperatio ns $ @ 6c7e70e) - поле (класс: $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC, имя: модуль ExtraDataFrameOperations $, тип: class $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ ExtraDataFrameOperations $) - объект (класс $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC поле $$ iwC $$ iwC $$ iwC, $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC @ 4c6d0802) - поле (класс: $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC, имя: $ iw, type: class $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ IWC $$ IWC) ...

Я не понимаю:

  • Почему эта ошибка возникла, если не выполнялась операция в кадре данных?
  • Почему «ExtraDataFrameOperations» не является сериализуемым, пока он был успешно использован раньше?

UPDATE:

Попытка с

@inline val testList = List[String]("a", "b") 

не помогает.

ответ

0

Похоже, что искра пытается сериализовать всю область вокруг testList. Попробуйте встроенные данные @inline val testList = List[String]("a", "b") или используйте другой объект, в котором вы храните функцию/данные, которые вы передаете драйверам.

+0

К сожалению, _ @ inline_ не помогает – user2573552

+0

И сохранение функции/данных в другом объекте не соответствует стратегии для настройки объекта dataframe – user2573552

Смежные вопросы