2016-11-02 3 views
1

Работа с искрой (искра 1.6.1, kafka 0.9.0) потребляет от темы Кафка с 20 разделами. Смещения сохраняются в DB оракула.Сбой при работе с искрой ArrayBuffer (kafka.common.NotLeaderForPartitionException)

При запуске задания я читал смещения от Oracle (читал один раз) и записывал смещения в oracle после обработки.

Моя работа успешно выполнялась в течение 8 часов, а затем не удалась по следующей причине. В процессе отказа нет изменений в теме кафки, искровой программе, кодеке оракула.

Может кто-нибудь сказать, почему я получаю эту ошибку при работе с искрообразованием?

16/11/02 08:09:21 ERROR JobScheduler: Error generating jobs for time 1478074160000 ms 
org.apache.spark.SparkException: ArrayBuffer(kafka.common.NotLeaderForPartitionException, org.apache.spark.SparkException: Couldn't find leader offsets for Set([MyTopic,11])) 
    at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) 
    at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) 
    at scala.Option.orElse(Option.scala:257) 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341) 
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246) 
    at scala.util.Try$.apply(Try.scala:161) 
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246) 
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
Exception in thread "main" java.lang.reflect.InvocationTargetException 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58) 
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) 
Caused by: org.apache.spark.SparkException: ArrayBuffer(kafka.common.NotLeaderForPartitionException, org.apache.spark.SparkException: Couldn't find leader offsets for Set([MyTopic,11])) 
    at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) 
    at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) 
    at scala.Option.orElse(Option.scala:257) 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341) 
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246) 
    at scala.util.Try$.apply(Try.scala:161) 
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246) 
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
16/11/02 08:09:21 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook 
16/11/02 08:09:21 INFO JobGenerator: Stopping JobGenerator immediately 
16/11/02 08:09:21 INFO RecurringTimer: Stopped timer for JobGenerator after time 1478074160000 
16/11/02 08:09:21 INFO JobGenerator: Stopped JobGenerator 
16/11/02 08:09:21 INFO JobScheduler: Stopped JobScheduler 
16/11/02 08:09:21 INFO StreamingContext: StreamingContext stopped successfully 
16/11/02 08:09:21 INFO SparkContext: Invoking stop() from shutdown hook 
16/11/02 08:09:21 INFO SparkUI: Stopped Spark web UI at http://10.251.228.103:4040 
16/11/02 08:09:21 INFO SparkDeploySchedulerBackend: Shutting down all executors 
16/11/02 08:09:21 INFO SparkDeploySchedulerBackend: Asking each executor to shut down 
16/11/02 08:09:21 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
16/11/02 08:09:21 INFO MemoryStore: MemoryStore cleared 
16/11/02 08:09:21 INFO BlockManager: BlockManager stopped 
16/11/02 08:09:21 INFO BlockManagerMaster: BlockManagerMaster stopped 
16/11/02 08:09:21 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 
16/11/02 08:09:21 INFO SparkContext: Successfully stopped SparkContext 
16/11/02 08:09:21 INFO ShutdownHookManager: Shutdown hook called 
16/11/02 08:09:21 INFO ShutdownHookManager: Deleting directory /app/spark/spark-1.6.1-bin-hadoop2.6/local/spark-30fb329c-3ccf-4d8c-a06c-2d36e6f968b3/httpd-f81472a2-3262-4eea-8d64-7ff96d2ef3e5 
16/11/02 08:09:21 INFO ShutdownHookManager: Deleting directory /app/spark/spark-1.6.1-bin-hadoop2.6/local/spark-30fb329c-3ccf-4d8c-a06c-2d36e6f968b3 
+1

Попробуйте найти журналы в Kafka/ZooKeeper. Убедитесь, что все в порядке, вверх и вниз, и что диски не заполнены. –

ответ

0

Для меня проблема была в том, что мой сервер Kafka был прерван. Достаточно легко, чтобы начать резервное копирование:

./bin/kafka-server-start.sh -daemon config/server.properties 
+0

Особенно, если вы храните информацию о теме/смещении извне, кажется, что ** Set ([MyTopic, 11])) ** поступает из вашего oracle db, но Kafka недоступен для поиска подходящей темы и смещения. – sbrannon

Смежные вопросы