У меня есть потребитель высокого уровня Kafka.Как проверить потребительское состояние Kafka
public class KafkaHighLevelConsumer implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final List<String> topics;
private final int id;
public KafkaHighLevelConsumer(int id,
String groupId,
List<String> topics,BlockingQueue<String> storyQueue) {
this.id = id;
this.topics = topics;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9091");
props.put("group.id", groupId);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
}
@Override
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
System.out.println(this.id + ": " + data);
}
}
} catch (WakeupException e) {
// ignore for shutdown
}finally {
consumer.close();
}
}
public void shutdown() {
consumer.wakeup();
}
}
Потребитель хорошо работает, но мне нужно следить за состоянием потребителя. Почему у нас нет исключения, если неправильный IP-адрес сервера или порт?
Если я сменил порт на некорректный props.put("bootstrap.servers", "localhost:9091");
на props.put("bootstrap.servers", "localhost:100500");
Я до сих пор не могу получить исключения.
Я хотел бы знать, успешно ли я связан с Кафкой или нет! Можно ли обрабатывать такой случай?
Я использую такой Maven DEPS
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
Спасибо!
Вы можете использовать kafka-consumer-groups.sh, чтобы просмотреть тему потребителей и ее состояния. – MGolovanov
Не исключение, так как клиент предполагает, что брокер может просто быть недоступен. Клиент будет подключаться к брокеру, если он будет подключен к Интернету. –