2015-01-30 4 views
0

Я изучаю Spark 1.2, запустив его на своей локальной машине с одним мастером и одним рабочим. Я запускаю искру, запустив .sbin/start-all.shВ чем смысл этой ошибки от Apache Spark?

Мастер и рабочий включены, и я вижу их в ui. Если я запускаю программу sample word count из GitHub, это работает, если настроить искровой контекст так:

String[] jars = {"pathto/nlp.jar"}; 
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("spark://myurl:7077").setJars(jars); 

В моей Яве, я нарушу большой документ в предложения, как это:

JavaRDD<Iterator<List<HasWord>>> sentences = lines.flatMap(new FlatMapFunction<String, Iterator<List<HasWord>>>() { 
     /** 
    * 
    */ 
    private static final long serialVersionUID = 1L; 

    @Override 
     public Iterable<Iterator<List<HasWord>>> call(String s) { 
      return (Iterable<Iterator<List<HasWord>>>) new DocumentPreprocessor(s).iterator(); 
     } 
}); 

До сих пор так хорошо.

Тогда я распечатайте подсчет РДУ

System.out.println(sentences.count()); // This works fine. Prints an integer 

Теперь я хочу, чтобы попытаться отфильтровать некоторые из предложений (на данный момент, я просто фильтрует все из них всегда просто возвращает истину).

sentences = sentences.filter(new Function<Iterator<List<HasWord>>, Boolean>() { 
    /** 
    * 
    */ 
    private static final long serialVersionUID = 2L; 

@Override 
    public Boolean call(Iterator<List<HasWord>> s) { 
    return true; 
    } 
}); 

Функция работает нормально. Но если бы я затем и запустить

System.out.println(sentences.count()); 

Я получаю длинный трассировки стека:

15/01/30 16:47:18 INFO DAGScheduler: Job 0 failed: count at JavaWordCount.java:134, took 1.203987 s 
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 17, lens.att.net): java.io.InvalidClassException: nlp.nlp.JavaWordCount$1; local class incompatible: stream classdesc serialVersionUID = 1, local class serialVersionUID = 8625903781884920246 
    at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:621) 
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623) 
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362) 
    at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:483) 
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) 
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57) 
    at org.apache.spark.scheduler.Task.run(Task.scala:56) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

Я также получить (разные) трассировки стека, если я не объявить серийный идентификатор.

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage 1.0 (TID 68, lens.att.net): java.io.InvalidClassException: nlp.nlp.JavaWordCount$2; local class incompatible: stream classdesc serialVersionUID = 3752701569517815536, local class serialVersionUID = 6132153642693122455 
    at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:621) 
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623) 
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) 
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57) 
    at org.apache.spark.scheduler.Task.run(Task.scala:56) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

Кажется, что какой-то класс не декларирует серийный идентификатор должным образом. Но я получаю сообщение об ошибке, независимо от того, включают ли я последовательные идентификаторы (как показано выше)

Примечания

Я бегу это затмение. У меня есть Maven проект в затмении с этой конфигой:

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-core_2.10</artifactId> 
    <version>1.2.0</version> 
</dependency> 

Я также работаю искру на моей локальной машине. Я загрузил в каталог pathto/spark-1.2.0-bin-hadoop2.4

+1

Звучит как проблема сериализации. Я не использую искру, поэтому я понятия не имею, в чем проблема, но по крайней мере это ключевое слово. – keyser

+0

Почему вы объявляете serialVersionUID? здесь это не нужно. –

+0

@SeanOwen Я не делал этого изначально, но это также вызывало ошибку – bernie2436

ответ

1

В моем случае это, похоже, проблема, возникающая, когда ваша искровая программа не синхронизируется со своими зависимостями в банке.

Моя программа загружает банки как этот

String[] jars = {"pathto/mydependencies.jar"}; 
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("spark://mylaptop:7077").setJars(jars); 

Если я внести изменения в основной программе, а затем запустить его в режиме отладки в Eclipse, я получаю эту ошибку. Но если я реэкспортирую в pathto/mydependencies.jar, он исправляет его.

2

Что нужно для серийного идентификатора? Что здесь происходит?

Класс, по которому жалуется исключение, составляет nlp.nlp.JavaWordCount$1. Это «имя» анонимного внутреннего класса.

Глядя на ваш код, я бы сказал, что это ваш анонимный класс FlatMapFunction. (Ключ в том, что вы видите идентификатор "1" в сообщении об ошибке.)


Вы используете одни и те же JAR-файлы на сериализации и десериализации сторон?В противном случае я предполагаю, что на одной из сторон отсутствует:

private static final long serialVersionUID = 1L; 

Исправление должно заключаться в использовании тех же JAR.

Но если JAR уже идентичны ... это странно.

Как возможное обходное решение, попробуйте преобразовать анонимный внутренний класс в (вложенный) вложенный класс ... или даже внешний класс. Если это сработает, вы можете использовать эту точку данных, чтобы помочь вам найти реальную проблему.

Если вы используете разные версии Spark в одном кластере, это может быть причиной. Рекомендуется использовать ту же самую версию везде.