2015-11-20 2 views
3

Мы используем Apache Spark 1.5.1 и kafka_2.10-0.8.2.1 и API Kafka DirectStream для извлечения данных из Кафка использует Искра.Не удалось найти лидеров для Set ([TOPICNNAME, 0])) Когда мы находимся в Apache Saprk

Мы создали темы Кафки со следующими параметрами

ReplicationFactor: 1 и Реплика: 1

Когда все экземпляры Кафки работают, работа Спарк отлично работает. Однако, когда один из экземпляров Kafka в кластере отключен, мы получаем исключение, воспроизведенное ниже. Через некоторое время мы перезапустили отключенный экземпляр Kafka и попытались закончить работу Spark, но Spark был уже завершен из-за исключения. Из-за этого мы не могли прочитать оставшиеся сообщения в темах Kafka.

ERROR DirectKafkaInputDStream:125 - ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for Set([normalized-tenant4,0])) 
ERROR JobScheduler:96 - Error generating jobs for time 1447929990000 ms 
org.apache.spark.SparkException: ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for Set([normalized-tenant4,0])) 
     at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) 
     at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) 
     at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) 
     at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) 
     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
     at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) 
     at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) 
     at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) 
     at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) 
     at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) 
     at scala.Option.orElse(Option.scala:257) 
     at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) 
     at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) 
     at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) 
     at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) 
     at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
     at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
     at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) 
     at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247) 
     at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245) 
     at scala.util.Try$.apply(Try.scala:161) 
     at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245) 
     at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) 
     at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) 
     at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) 
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 

Заранее спасибо. Пожалуйста, помогите решить эту проблему.

ответ

4

Ожидаемое поведение. Вы запросили сохранение каждой темы на одной машине, установив ReplicationFactor на один. Когда одна машина, которая пытается сохранить тему normalized-tenant4, снимается, потребитель не может найти лидера темы.

См. http://kafka.apache.org/documentation.html#intro_guarantees.

1

Одна из причин такого типа ошибок, когда лидер не может быть найден для указанной темы - проблема с конфигурациями сервера Kafka.

Откройте конфигу сервера Кафки:

vim ./kafka/kafka-<your-version>/config/server.properties 

В разделе «Настройка сокет сервер» раздела, обеспечивает IP для хоста, если его недостающий:

listeners=PLAINTEXT://{host-ip}:{host-port} 

Я использовал установку Кафки снабжен MapR песочница и пыталась получить доступ к кафке с помощью искрового кода. Я получал ту же ошибку при доступе к моей кафке, так как моя конфигурация не пропускала IP.

Смежные вопросы