2015-06-08 4 views
4

Я пытаюсь запустить следующий простой код Spark:org.apache.spark.SparkException: Задача не сериализации - JavaSparkContext

Gson gson = new Gson(); 
JavaRDD<String> stringRdd = jsc.textFile("src/main/resources/META-INF/data/supplier.json"); 

JavaRDD<SupplierDTO> rdd = stringRdd.map(new Function<String, SupplierDTO>() 
{ 
    private static final long serialVersionUID = -78238876849074973L; 

    @Override 
    public SupplierDTO call(String str) throws Exception 
    { 
     return gson.fromJson(str, SupplierDTO.class); 
    } 
}); 

Но это бросает следующее сообщение об ошибке при выполнении stringRdd.map заявления:

org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:1478) 
at org.apache.spark.rdd.RDD.map(RDD.scala:288) 
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78) 
at org.apache.spark.api.java.JavaRDD.map(JavaRDD.scala:32) 
at com.demo.spark.processor.cassandra.CassandraDataUploader.uploadData(CassandraDataUploader.java:71) 
at com.demo.spark.processor.cassandra.CassandraDataUploader.main(CassandraDataUploader.java:47) 
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) 
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) 
... 7 more 

Здесь 'jsc' - это JavaSparkContext объект, который я использую. Насколько я знаю, JavaSparkContext не является объектом Serializable, и его нельзя использовать в каких-либо функциях, которые будут отправлены работникам искры.

Теперь, что я не могу понять, как экземпляр JavaSparkContext отправляется рабочим? Что я должен изменить в своем коде, чтобы избежать такого сценария?

+0

Ты уверен, что все журнал? Я думаю, что это проблема с Gson, так как раньше я сталкивался с той же проблемой. – nhahtdh

+0

Вот полный журнал. –

+0

Вы можете вставить класс SupplierDTO здесь? – Sathish

ответ

5

Ссылка gson ссылается на «вытягивание» внешнего класса в область действия замыкания, с его полным графиком объекта.

В этом случае создайте объект gson внутри крышки:

public SupplierDTO call(String str) throws Exception { 
    Gson gson = Gson(); 
    return gson.fromJson(str, SupplierDTO.class); 
} 

Вы можете также объявить искровой контекст transient

Если создание экземпляра Gson является дорогостоящим, рекомендуется использовать mapPartitions вместо map.

+1

Создание объекта Gson в методе вызова не имеет никакого значения. Ни два других варианта, о которых вы говорили. Пожалуйста помоги. –

+0

@ArkaGhosh, добавляя 'transient' к объявлению' sparkContext', должно, как минимум, изменять генерируемое исключение. – maasg

+0

вы можете упомянуть полностью классифицированное имя этой аннотации «переходный». –

3

Для меня эта проблема решена с помощью одного из следующих вариантов:

  1. Как уже упоминалось выше, объявляя SparkContext, как transient
  2. Вы могли бы также попытаться сделать объект gson статический static Gson gson = new Gson();

См. Документ Job aborted due to stage failure: Task not serializable

t о увидеть другие доступные варианты, чтобы решить эту problème

+0

У меня тоже был такой же вопрос. Я обозначил свою контекстную ссылку как временную. Я работал – BDR

+0

Статический Gson отлично работает –

0

Вы можете использовать ниже код вместо линии 9. (return gson.fromJson(str, SupplierDTO.class);)

return new Gson().fromJson(str, SupplierDTO.class);//this is correct 

и удалить линии 1. (Gson gson = new Gson();)

Смежные вопросы