2015-06-16 4 views
2

У меня есть приложение Spark streaming, которое использует SparkSQL, написанное на Scala, которое пытается зарегистрировать udf после получения RDD. Я получаю ошибку ниже. Невозможно ли зарегистрировать udfs в приложении SparkStreaming?Регистрация UDF в приложении SparkStreaming

Вот фрагмент кода, который бросает ошибку:

sessionStream.foreachRDD((rdd: RDD[(String)], time: Time) => { 
     val sqlcc = SqlContextSingleton.getInstance(rdd.sparkContext) 
     sqlcc.udf.register("getUUID",() => java.util.UUID.randomUUID().toString) 
... 
} 

Вот вбрасывание ошибка при попытке зарегистрировать функцию:

Exception in thread "pool-6-thread-6" java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror; 
    at com.ignitionone.datapipeline.ClusterApp$$anonfun$CreateCheckpointStreamContext$1.apply(ClusterApp.scala:173) 
    at com.ignitionone.datapipeline.ClusterApp$$anonfun$CreateCheckpointStreamContext$1.apply(ClusterApp.scala:164) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) 
    at scala.util.Try$.apply(Try.scala:161) 
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

ответ

1
sessionStream.foreachRDD((rdd: RDD[Event], time: Time) => { 
    val f = (t: Long) => t - t % 60000  

    val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) 
    import sqlContext.implicits._ 
    val df = rdd.toDF() 

    val per_min = udf(f) 
    val grouped = df.groupBy(per_min(df("created_at")) as "created_at", 
          df("blah"), 
          df("status") 
          ).agg(sum("price") as "price",sum("payout") as "payout", sum("counter") as "counter") 
    ... 
} 

работает отлично мной

Смежные вопросы