2014-12-30 3 views
7

Я использую искру с кассандрой, и я hava a JavaRDD<String> клиентов. И для каждого клиента, я хочу, чтобы выбрать из Кассандры его взаимодействий, как это:JavaSparkContext не serializable

avaPairRDD<String, List<InteractionByMonthAndCustomer>> a = client.mapToPair(new PairFunction<String, String, List<InteractionByMonthAndCustomer>>() { 
     @Override 
     public Tuple2<String, List<InteractionByMonthAndCustomer>> call(String s) throws Exception {    
      List<InteractionByMonthAndCustomer> b = javaFunctions(sc) 
        .cassandraTable(CASSANDRA_SCHEMA, "interaction_by_month_customer") 
        .where("ctid =?", s) 
        .map(new Function<CassandraRow, InteractionByMonthAndCustomer>() { 
         @Override 
         public InteractionByMonthAndCustomer call(CassandraRow cassandraRow) throws Exception { 
          return new InteractionByMonthAndCustomer(cassandraRow.getString("channel"), 
            cassandraRow.getString("motif"), 
            cassandraRow.getDate("start"), 
            cassandraRow.getDate("end"), 
            cassandraRow.getString("ctid"), 
            cassandraRow.getString("month") 
          ); 
         } 
        }).collect(); 
      return new Tuple2<String, List<InteractionByMonthAndCustomer>>(s, b); 
     } 
    }); 

Для этого я использую один JavaSparkContext sc. Но я получил эту ошибку:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) 
at org.apache.spark.rdd.RDD.map(RDD.scala:270) 
at org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:99) 
at org.apache.spark.api.java.JavaRDD.mapToPair(JavaRDD.scala:32) 
at fr.aid.cim.spark.dao.GenrateCustumorJourney.AllCleintInteractions(GenrateCustumorJourney.java:91) 
at fr.aid.cim.spark.dao.GenrateCustumorJourney.main(GenrateCustumorJourney.java:75) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:483) 
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) 
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) 
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) 
... 14 more 

Я думаю, что JavaSparkContext должен быть сериализуемым. Но как я могу сделать это сериализуемым, пожалуйста?

спасибо.

ответ

12

Нет, JavaSparkContext не является сериализуемым и не должен быть. Он не может использоваться в функции, которую вы отправляете удаленным сотрудникам. Здесь вы явно не ссылаетесь на него, но ссылка все равно сериализуется, потому что ваша анонимная внутренняя функция класса не является static и поэтому имеет ссылку на охватывающий класс.

Попробуйте переписать код с помощью этой функции в виде автономного объекта static.

0

Вы не можете использовать SparkContext и создавать другие RDD внутри исполнителя (функция отображения RDD).

Вам необходимо создать Cassandra RDD (sc.cassandraTable) в драйвере, а затем выполнить соединение между этими двумя RDD (клиентское RDD и таблица Cassandra RDD).

+0

Правда, код не должен работать так, как (Спарк запрещает преобразование внутри преобразования и т.д ..) –

0

Объявить его с ключевым словом: transient

private transient JavaSparkContext sparkContext; 
Смежные вопросы