2015-03-06 7 views
2

Я пытаюсь использовать Neo4j с Apache Spark Streaming, но я нахожу сериализуемость как проблему.Использование Neo4j с Apache Spark

В принципе, я хочу, чтобы Apache Spark анализировал и собирал мои данные в реальном времени. После того, как данные были добавлены, он должен быть сохранен в базе данных Neo4j. Тем не менее, я получаю эту ошибку:

org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) 
    at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297) 
    at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45) 
    at twoGrams.Main$4.call(Main.java:102) 
    at twoGrams.Main$4.call(Main.java:1) 
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) 
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) 
    at scala.util.Try$.apply(Try.scala:161) 
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.NotSerializableException: org.neo4j.kernel.EmbeddedGraphDatabase 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) 
    ... 17 more 

Вот мой код:

output a stream of type: JavaPairDStream<String, ArrayList<String>> 

output.foreachRDD(
       new Function2<JavaPairRDD<String,ArrayList<String>>,Time,Void>(){ 

        @Override 
        public Void call(
          JavaPairRDD<String, ArrayList<String>> arg0, 
          Time arg1) throws Exception { 
         // TODO Auto-generated method stub 

         arg0.foreach(
           new VoidFunction<Tuple2<String,ArrayList<String>>>(){ 

            @Override 
            public void call(
              Tuple2<String, ArrayList<String>> arg0) 
              throws Exception { 
             // TODO Auto-generated method stub 
             try(Transaction tx = graphDB.beginTx()){ 
              if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null) 
               System.out.println("Alread in Database:" + arg0._1); 
              else{ 
               Neo4jOperations.createHMac(graphDB, arg0._1); 
              } 
              tx.success(); 
             } 
            } 

         }); 
         return null; 
        } 



       }); 

Neo4jOperations Класс:

public class Neo4jOperations{ 

public static Node getHMacFromValue(GraphDatabaseService graphDB,String value){ 
     try(ResourceIterator<Node> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"), "value", value).iterator()){ 
      return HMacs.next(); 
     } 
    } 

    public static void createHMac(GraphDatabaseService graphDB,String value){ 
     Node HMac=graphDB.createNode(DynamicLabel.label("HMac")); 
     HMac.setProperty("value", value); 
     HMac.setProperty("time", new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime())); 
    } 
} 

Я знаю, что я должен сериализовать Neo4jOperations класса, но я «Я могу понять, как это сделать. Или есть ли другой способ достичь этого?

ответ

0

Существует не возможный способ сериализации транзитивных зависимостей, включенных в класс Neo4jOperations. Спарк, к сожалению, так не работает.

Проблема заключается в том, что API обхода Neo4j не может быть сериализован или связан и отправлен в Spark. Даже если вы попытаетесь связать Spark с Neo4j, вы столкнетесь с конфликтами зависимостей с версиями сервлетов Jetty.

Именно поэтому я создал Neo4j Mazerunner. Пока не будет создан коннектор Neo4j Spark, который расширяет базовые классы пакета Spark RDD, не будет простого способа передачи данных из Neo4j в среду выполнения Spark.

См. Couchbase's Spark Connector, чтобы получить представление о том, что связано с этим.

Mazerunner пока не поддерживает потоковое возможности, но у меня есть планы, чтобы сделать это произойдет в будущем

+0

Итак, в основном ваше предложение состоит в том, что я не могу подключить Spark Streaming и Neo4j? – d34th4ck3r

1

В тех случаях, когда соединения с внешними системами участвуют или при работе с несериализуемых объектов, можно создать эти объекты непосредственно на рабочих и избежать необходимости сериализации.

Given: val stream: DStream = ??? 
stream.forEachRDD{rdd => 
    rdd.forEachPartition{iter => 
     val nonSerializableConn = new NonSerializableDriver(ip, port) 
     iter.foreach(elem => nonSerializableConn.doStuff(elem) 
    } 
} 

Эта модель амортизирует создание объекта, делая это только один раз в раздел (который будет содержать много элементов)

В долгоживущих процессах, как искра Streaming, мы можем еще больше снизить накладные расходы, сохраняя за -VM кэш ресурсов:

stream.forEachRDD{rdd => 
    rdd.forEachPartition{iter => 
     val nonSerializableConn = NonSerializableDriver.getConnection(ip, port) 
     iter.foreach(elem => nonSerializableConn.doStuff(elem) 
    } 
} 

В последнем случае нам необходимо выполнить управление подключением и закрыть ресурсы, когда VM завершает работу.

+1

Интересный шаблон. Я думаю, что проблема с этим подходом заключается в том, что Java-код не сериализуется для Neo4j, о котором я знаю. Возможно, встроенная база данных графиков Neo4j может быть объединена в толстый JAR и отправлена ​​в Spark. Я думаю, что имеет смысл использовать TinkerPop для чего-то подобного. Наконец, сервер Neo4j предоставляет REST API, поэтому драйверы - это клиентские библиотеки, которые обертывают HTTP-запросы. Вы должны сделать запросы долговечными и использовать пакетный API. Не идеально. –

Смежные вопросы