2015-12-21 6 views
16

Я написал kafkaconsumer и producer, который отлично работает до сегодняшнего дня. Сегодня утром, когда я начал zooekeeper и kafka, мой потребитель не удалось прочитать сообщения и Zookeeper log я прочитал эту ошибкуОшибка Zookeeper & Kafka KeeperErrorCode = NodeExists

INFO Got user-level KeeperException when processing sessionid:0x151c41e62e10000 type:create cxid:0x2a zxid:0x1e txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor) 

Не могли бы вы мне помочь? Что могло измениться всего за несколько дней? Я не понимаю. спасибо.

+0

Похоже, вы пытаетесь запустить другой брокер kafka с таким же идентификатором брокера –

+0

Спасибо. Это очень странно. Я следую той же процедуре других времен. Что я могу попытаться решить проблему? – adellarocca

+0

Попробуйте перезапустить zookeeper, за которым следует kafka один раз –

ответ

0

У меня была эта ошибка в моем Kafka 2.11, работающем под Windows 7. Я думаю, что это исключение не проблема, так как это только информационный уровень. Просто убедитесь, что брокер все еще работает. Даже с этой ошибкой я мог бы:

  1. Создайте и укажите тему kafka-topics.bat.
  2. Использовать тему kafka-console-consumer.bat.
  3. Программно отправьте сообщение producer.send(new ProducerRecord<String, String>("topic", "hello")).
+0

Я подтверждаю это, поскольку я испытываю ту же проблему в своей среде Windows, но она, похоже, не влияет на функциональность –

0

В моем случае это, как представляется, влияет на функциональность, поскольку я не могу использовать сообщения. См. Код ниже

Vertx instance = VertxConfig.getInstance();

Properties consumerConfig = new Properties(); 
    consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
    consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 

// consumerConfig.put (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "ранний"); // consumerConfig.put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

Properties producerConfig = new Properties(); 
    producerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
    producerConfig.put("acks", "1"); 

    String topic = "dstv-queue-3"; 
    consumer = KafkaConsumer.create(instance, consumerConfig); 
    producer = KafkaProducer.create(instance, producerConfig, String.class, String.class); 
    consumer.subscribe(topic); 

    instance.setPeriodic(2000, worker -> { 
     KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(topic, "message"); 
     producer.write(record, writeHandler -> { 
      RecordMetadata metadata = writeHandler.result(); 

      //if meta data returned.. 
      if (metadata != null) { 
       long offset = metadata.getOffset(); 
       int partition = metadata.getPartition(); 
       System.out.println("completed write: " + (writeHandler.succeeded() ? "successful" : "failed") + " offset:" + offset + " partition: " + partition); 
      } 
     }); 
    }); 

    AtomicLong counter = new AtomicLong(); 
    consumer.handler(readHandler -> System.out.println(counter.getAndAdd(1) + ". " + readHandler.value() + " was received")); 
+0

Я не понимаю –

Смежные вопросы