2

Here «s пример aggregateByKey на mutable.HashSet [String], написанная @bbejeckСпарк aggregateByKey на Dataset

val initialSet = mutable.HashSet.empty[String] 
val addToSet = (s: mutable.HashSet[String], v: String) => s += v 
val mergePartitionSets = (p1: mutable.HashSet[String], p2: mutable.HashSet[String]) => p1 ++= p2 
val uniqueByKey = kv.aggregateByKey(initialSet)(addToSet, mergePartitionSets) 

Но когда я изменил Dataset, я получил следующее сообщение об ошибке, в том, что из-за Спарк 2.0 (версия, которую я использую) не поддерживает aggregateByKey в Dataset?

java.lang.NullPointerException 
at org.apache.spark.sql.Dataset.schema(Dataset.scala:393) 
at org.apache.spark.sql.Dataset.toDF(Dataset.scala:339) 
at org.apache.spark.sql.Dataset.showString(Dataset.scala:239) 
at org.apache.spark.sql.Dataset.show(Dataset.scala:526) 
at org.apache.spark.sql.Dataset.show(Dataset.scala:486) 
at org.apache.spark.sql.Dataset.show(Dataset.scala:495) 

Вот код:

case class Food(name: String, 
       price: String, 
       e_date: String)  
rdd.aggregateByKey(Seq(Food("", "", "")).toDS)( 
        (f1,f2) => f1.union(f2), 
        (f1,f2) => f1.union(f2)) 
///////// 
found f1 = Invalid tree; null: 
        null 

Любые идеи, почему это происходит, спасибо заранее!

ответ

2

Да, я думаю, что aggregateByKey работает только с rdd.
здесь документация (это для питона)
http://spark.apache.org/docs/latest/api/python/pyspark.html

Удалить .toDS и попробуйте код. Возможно преобразование его в DS после завершения агрегации (не уверен, что это будет лучше в производительности).

+0

Спасибо @Bigby, я удалил .toDS, он работает сейчас – faustineinsun

Смежные вопросы