Here «s пример aggregateByKey на mutable.HashSet [String], написанная @bbejeckСпарк aggregateByKey на Dataset
val initialSet = mutable.HashSet.empty[String]
val addToSet = (s: mutable.HashSet[String], v: String) => s += v
val mergePartitionSets = (p1: mutable.HashSet[String], p2: mutable.HashSet[String]) => p1 ++= p2
val uniqueByKey = kv.aggregateByKey(initialSet)(addToSet, mergePartitionSets)
Но когда я изменил Dataset, я получил следующее сообщение об ошибке, в том, что из-за Спарк 2.0 (версия, которую я использую) не поддерживает aggregateByKey в Dataset?
java.lang.NullPointerException
at org.apache.spark.sql.Dataset.schema(Dataset.scala:393)
at org.apache.spark.sql.Dataset.toDF(Dataset.scala:339)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
Вот код:
case class Food(name: String,
price: String,
e_date: String)
rdd.aggregateByKey(Seq(Food("", "", "")).toDS)(
(f1,f2) => f1.union(f2),
(f1,f2) => f1.union(f2))
/////////
found f1 = Invalid tree; null:
null
Любые идеи, почему это происходит, спасибо заранее!
Спасибо @Bigby, я удалил .toDS, он работает сейчас – faustineinsun