Я столкнулся с странные проблемы с Scala/Спарк (1.5) и Цеппелин:Задача не сериализации, используя пользовательский класс dataframe в Спарк Scala
Если я запускаю следующий Scala-код/Спарк, он будет работать должным образом:
// TEST NO PROBLEM SERIALIZATION
val rdd = sc.parallelize(Seq(1, 2, 3))
val testList = List[String]("a", "b")
rdd.map{a =>
val aa = testList(0)
None}
Однако после объявления пользовательского типа dataframe как предложено here
//DATAFRAME EXTENSION
import org.apache.spark.sql.DataFrame
object ExtraDataFrameOperations {
implicit class DFWithExtraOperations(df : DataFrame) {
//drop several columns
def drop(colToDrop:Seq[String]):DataFrame = {
var df_temp = df
colToDrop.foreach{ case (f: String) =>
df_temp = df_temp.drop(f)//can be improved with Spark 2.0
}
df_temp
}
}
}
и использовать его, например, как следующее:
//READ ALL THE FILES INTO different DF and save into map
import ExtraDataFrameOperations._
val filename = "myInput.csv"
val delimiter = ","
val colToIgnore = Seq("c_9", "c_10")
val inputICFfolder = "hdfs:///group/project/TestSpark/"
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "false") // Automatically infer data types? => no cause we need to merge all df, with potential null values => keep string only
.option("delimiter", delimiter)
.option("charset", "UTF-8")
.load(inputICFfolder + filename)
.drop(colToIgnore)//call the customize dataframe
Этот пробег успешно завершен.
Теперь, если я снова запустить следующий код (то же самое, что и выше)
// TEST NO PROBLEM SERIALIZATION
val rdd = sc.parallelize(Seq(1, 2, 3))
val testList = List[String]("a", "b")
rdd.map{a =>
val aa = testList(0)
None}
Я получаю сообщение об ошибке:
рдд: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [8] at Распараллеливать по: 32 testList: List [String] = List (a, b) org.apache.spark.SparkException: Задача не может быть сериализована в org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 304) по адресу org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean (ClosureCleaner.scala: 294) at org.apache.spark.util.ClosureCleaner $ .clean (ClosureCleaner.scala : 122) at org.apache.spark.SparkContext.clean (SparkContext.scala: 2032) at org.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply (RDD.scala: 314) .. Вызвать: java.io.NotSerializableException: $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ ExtraDataFrameOperations $ Сериализация стека: - объект не сериализуется (класс: $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ ExtraDataFrameOperations $, значение: $ iwC $$ iwC $$ iwC $$ iwC $$ IWC IWC $$ $$ $$ IWC IWC IWC $$ $ ExtraDataFrameOperatio ns $ @ 6c7e70e) - поле (класс: $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC, имя: модуль ExtraDataFrameOperations $, тип: class $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ ExtraDataFrameOperations $) - объект (класс $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC поле $$ iwC $$ iwC $$ iwC, $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC @ 4c6d0802) - поле (класс: $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC, имя: $ iw, type: class $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ IWC $$ IWC) ...
Я не понимаю:
- Почему эта ошибка возникла, если не выполнялась операция в кадре данных?
- Почему «ExtraDataFrameOperations» не является сериализуемым, пока он был успешно использован раньше?
UPDATE:
Попытка с
@inline val testList = List[String]("a", "b")
не помогает.
К сожалению, _ @ inline_ не помогает – user2573552
И сохранение функции/данных в другом объекте не соответствует стратегии для настройки объекта dataframe – user2573552