Я хочу использовать SparkContext и SQLContext внутри foreachPartition
, но не смог это сделать из-за ошибки сериализации. Я знаю, что оба объекта не сериализации, но я думал, что foreachPartition
выполняется на хозяина, где и Спарк контекст и SQLContext доступны ..Как использовать SQLContext и SparkContext внутри foreachPartition
нотация:
`msg -> Map[String,String]`
`result -> Iterable[Seq[Row]]`
Это мой текущий код (UtilsDM - объект, который extends Serializable
). Часть кода, которая не работает, начинается с val schema =...
, где я хочу написать result
в DataFrame
, а затем сохранить его в Паркет. Возможно, способ, которым я организовал код, неэффективен, тогда я бы хотел здесь ваши рекомендации. Благодарю.
// Here I am creating df from parquet file on S3
val exists = FileSystem.get(new URI("s3n://" + bucketNameCode), sc.hadoopConfiguration).exists(new Path("s3n://" + bucketNameCode + "/" + pathToSentMessages))
var df: DataFrame = null
if (exists) {
df = sqlContext
.read.parquet("s3n://bucket/pathToParquetFile")
}
UtilsDM.setDF(df)
// Here I process myDStream
myDStream.foreachRDD(rdd => {
rdd.foreachPartition{iter =>
val r = new RedisClient(UtilsDM.getHost, UtilsDM.getPort)
val producer = UtilsDM.createProducer
var df = UtilsDM.getDF
val result = iter.map{ msg =>
// ...
Seq(msg("key"),msg("value"))
}
// HERE I WANT TO WRITE result TO S3, BUT IT FAILS
val schema = StructType(
StructField("key", StringType, true) ::
StructField("value", StringType, true)
result.foreach { row =>
val rdd = sc.makeRDD(row)
val df2 = sqlContext.createDataFrame(rdd, schema)
// If the parquet file is not created, then create it
var df_final: DataFrame = null
if (df != null) {
df_final = df.unionAll(df2)
} else {
df_final = df2
}
df_final.write.parquet("s3n://bucket/pathToSentMessages)
}
}
})
EDIT:
Я использую Спарк 1.6.2 и Scala 2.10.6.
версию искры вы используете? – mrsrinivas
@MRSrinivas: Я использую Spark 1.6.2 и Scala 2.10.6. Извините, что не упоминал об этом. – duckertito