Я пытаюсь работать с Spark и получаю сообщение об ошибке при запуске скрипта Scala удаленно.Scala with Spark - java.io.OptionalDataException при создании нового SparkContext
Я работаю с искрой 1.4.1, и импортировать эту зависимость в Maven построить:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.1</version>
</dependency>
Это упрощенный код я бегу:
package test.package
import org.apache.spark.{ SparkConf, SparkContext }
object App {
def main(args: Array[String]) {
System.out.println("Setting up Spark..")
val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("spark://141.161.88.169:7077")
val sc = new SparkContext(sparkConf) // Error occurs here.
System.out.println("Done!")
}
}
И вот трассировка стека из журналов на ведущем приборе Spark:
15/10/08 14:12:55 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:55578] has failed, address is now gated for [5000] ms. Reason is: [null].
15/10/08 14:13:15 INFO Master: akka.tcp://[email protected]:55578 got disassociated, removing it.
15/10/08 14:13:15 ERROR Remoting:
java.io.OptionalDataException
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
at scala.util.Try$.apply(Try.scala:161)
at akka.serialization.Serialization.deserialize(Serialization.scala:98)
at akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:63)
at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
at scala.util.Try$.apply(Try.scala:161)
at akka.serialization.Serialization.deserialize(Serialization.scala:98)
at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58)
at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58)
at akka.remote.DefaultMessageDispatcher.payloadClass$1(Endpoint.scala:59)
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:99)
at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Похоже на проблему сериализации с конфигурацией но не похоже, что что-то необычное.
EDIT: Добавление скриншота мастер-интерфейса.
Spark SS http://i61.tinypic.com/4uzcb9.png
IP-адрес, указанный в сообщении об ошибке (10.212.142.148), является IP-адресом моего ПК по VPN. В основном это говорит о том, что связь с клиентом, который отправляет работу, потерпела неудачу. Поэтому я не уверен, что это проблема. – ev0lution37
Я думаю, вы должны убедиться, что IP-адреса остаются неизменными, иначе это может вызвать проблемы. возможно, просто попробуйте использовать свой адрес VPN для setMaster («spark: //10.212.142.148: 7077») – keypoint
IP (10.212.142.148) не находится в кластере Spark, поэтому настройка как мастера не кажется как будто все будет. Я так не верю, но только для подтверждения, место, в котором я отправляю задание, не обязательно должно быть узлом в кластере, не так ли? – ev0lution37