2015-12-29 2 views
0

Я пытаюсь использовать класс Spring JMSTemplate внутри метода rdd.foreach, но я получаю ошибку Task Not Serializable. Когда я пытаюсь использовать статическую переменную, она работает на локальном, но в кластере, я получаю исключение из null-указателя.Задача Не Сериализуемое Исключение - При использовании JMSTemplate в Spark foreach

Пример кода:

inputRDD.foreach(record -> { 

        messageServices.send(record); 
} 

Журнал ошибок:

org.apache.spark.SparkException: Task not serializable 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) 
     at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) 
     at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) 
     at org.apache.spark.SparkContext.clean(SparkContext.scala:1891) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:869) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) 
     at org.apache.spark.rdd.RDD.foreach(RDD.scala:868) 
     at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:327) 
     at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:47) 
     at com.messenger.MessengerDriver.runJob(MessengerDriver.java:108) 
     at com.messenger.MessengerDriver.main(MessengerDriver.java:60) 
Caused by: java.io.NotSerializableException: org.springframework.jms.core.JmsTemplate 
Serialization stack: 
     - object not serializable (class: org.springframework.jms.core.JmsTemplate, value: [email protected]) 
     - field (class: com.messenger.Messenger.activemq.MessageProducer, name: jmsTemplate, type: class org.springframework.jms.core.JmsTemplate) 
     - object (class com.messenger.Messenger.activemq.MessageProducer, [email protected]) 
     - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) 
     - field (class: org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1, name: f$14, type: interface org.apache.spark.api.java.function.VoidFunction) 
     - object (class org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1, <function1>) 
     at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
     at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
     at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312) 
     ... 13 more 

Кто-нибудь сталкивался с такой же вопрос?

ответ

0

Правильный шаблон для использования перераспределения & mapPartitions.
repartition - это карта RDD для подходящего размера;
mapPartitions предназначен для индивидуальной обработки каждого раздела, вы можете создать JMSTemplate для каждого раздела внутри передающей функции.

+0

Я исправил проблему, используя foreachPartition только .. но я не использовал перераспределение .. я сделаю это .. спасибо – Shankar

Смежные вопросы