2015-07-23 2 views
5

Я написал продюсер Kafka в NodeJS и Kafka Consumer в Java Maven. Моя тема «тест», который был создан с помощью следующей команды:BrokerNotAvailableError: Не удалось найти лидера Исключение во время Spark Streaming

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 

Производитель в NodeJS:

var kafka = require('kafka-node'); 
var Producer = kafka.Producer; 
var Client = kafka.Client; 
var client = new Client('localhost:2181'); 
var producer = new Producer(client); 

producer.on('ready', function() { 
    producer.send([ 
     { topic: 'test', partition: 0, messages: ["This is the zero message I am sending from Kafka to Spark"], attributes: 0}, 
     { topic: 'test', partition: 1, messages: ["This is the first message I am sending from Kafka to Spark"], attributes: 0}, 
     { topic: 'test', partition: 2, messages: ["This is the second message I am sending from Kafka to Spark"], attributes: 0} 
     ], function (err, result) { 
     console.log(err || result); 
     process.exit(); 
    }); 
}); 

Когда я отправить два сообщения от производителя NodeJS, успешно потребляются Java Потребителя. Но когда я посылаю три или более сообщений от производителя NodeJS, это дает мне следующую ошибку:

{[BrokerNotAvailableError: Не удалось найти лидера] сообщение: «Не удалось найти лидера»}

Я хочу спросить что как я могу установить LEADER на любое сообщение в теме «тест». Или что должно быть решением проблемы.

+0

Для достижения большей надежности вы можете запустить большее количество брокеров и включить репликацию по этой теме, чтобы, если ведущий брокер останавливает другого брокера-посредника, он займет позицию, и вы будете меньше работать в ситуации лидеров. – anand

ответ

1

Тема была создана с 1 разделом, однако на стороне производителя вы пытаетесь отправить сообщения на 3 раздела, по логике Kafka не должен найти лидера для других разделов и должен выбросить это исключение.

0

Вместо перегородки используйте разделы во множественном числе.

, например:

producer.on('ready', function() { 
producer.send([ 
    { topic: 'test', partitions: 0, messages: ["This is the zero message I am sending from Kafka to Spark"], attributes: 0}, 
    { topic: 'test', partitions: 1, messages: ["This is the first message I am sending from Kafka to Spark"], attributes: 0}, 
    { topic: 'test', partitions: 2, messages: ["This is the second message I am sending from Kafka to Spark"], attributes: 0} 
    ], function (err, result) { 
    console.log(err || result); 
    process.exit(); 
}); 

});

1

Существует ошибка, которая может привести к этому в текущей версии kafka-node

https://github.com/SOHU-Co/kafka-node/issues/354

HighLevelProducer with KeyedPartitioner fails on first send #354 When using KeyedParitioner with the HighLevelProducer the first send fails with BrokerNotAvailableError: Could not find the leader Consecutive sends work perfectly.

Также см https://www.npmjs.com/package/kafka-node#highlevelproducer-with-keyedpartitioner-errors-on-first-send

который рекомендует

Call client.refreshMetadata() before sending the first message.

Это, как я это сделано

// Refresh metadata required for the first message to go through 
    // https://github.com/SOHU-Co/kafka-node/pull/378 
    client.refreshMetadata([topic], (err) => { 
     if (err) { 
      console.warn('Error refreshing kafka metadata', err); 
     } 
    }); 
Смежные вопросы