2
У меня есть приложение Spark streaming, которое использует SparkSQL, написанное на Scala, которое пытается зарегистрировать udf после получения RDD. Я получаю ошибку ниже. Невозможно ли зарегистрировать udfs в приложении SparkStreaming?Регистрация UDF в приложении SparkStreaming
Вот фрагмент кода, который бросает ошибку:
sessionStream.foreachRDD((rdd: RDD[(String)], time: Time) => {
val sqlcc = SqlContextSingleton.getInstance(rdd.sparkContext)
sqlcc.udf.register("getUUID",() => java.util.UUID.randomUUID().toString)
...
}
Вот вбрасывание ошибка при попытке зарегистрировать функцию:
Exception in thread "pool-6-thread-6" java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror;
at com.ignitionone.datapipeline.ClusterApp$$anonfun$CreateCheckpointStreamContext$1.apply(ClusterApp.scala:173)
at com.ignitionone.datapipeline.ClusterApp$$anonfun$CreateCheckpointStreamContext$1.apply(ClusterApp.scala:164)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)