2016-06-27 6 views
0

Я пытаюсь использовать 2,0 наборов данных Apache искру в:SparkSQL агрегатор: MissingRequirementError

import org.apache.spark.sql.expressions.Aggregator 
import org.apache.spark.sql.Encoder 
import spark.implicits._ 

case class C1(f1: String, f2: String, f3: String, f4: String, f5: Double) 

val teams = Seq(
    C1("hash1", "NLC", "Cubs", "2016-01-23", 3253.21), 
    C1("hash1", "NLC", "Cubs", "2014-01-23", 353.88), 
    C1("hash3", "NLW", "Dodgers", "2013-08-15", 4322.12), 
    C1("hash4", "NLE", "Red Sox", "2010-03-14", 10283.72) 
).toDS() 

val c1Agg = new Aggregator[C1, Seq[C1], Seq[C1]] with Serializable { 
    def zero: Seq[C1] = Seq.empty[C1] //Nil 
    def reduce(b: Seq[C1], a: C1): Seq[C1] = b :+ a 
    def merge(b1: Seq[C1], b2: Seq[C1]): Seq[C1] = b1 ++ b2 
    def finish(r: Seq[C1]): Seq[C1] = r 

    override def bufferEncoder: Encoder[Seq[C1]] = newProductSeqEncoder[C1] 
    override def outputEncoder: Encoder[Seq[C1]] = newProductSeqEncoder[C1] 
}.toColumn 

val g_c1 = teams.groupByKey(_.f1).agg(c1Agg).collect 

Но тогда, когда я запускаю его я получил следующее сообщение об ошибке:

scala.reflect.internal.MissingRequirementError: class lineb4c2bb72bf6e417e9975d1a65602aec912.$read in JavaMirror with [email protected] of type class sun.misc.Launcher$AppClassLoader with class path [OMITTED] not found

Я предполагаю, что конфигурация правильно, потому что я бегу под облаком сообщества Databricks.

ответ

0

я, наконец, смог сделать его работу с помощью ExpressionEncoder() вместо newProductSeqEncoder [C1] в строках 20, 21.

(Не знаю, почему предыдущий код не работает, хотя.)

Смежные вопросы