2016-06-29 7 views
0

У меня есть потребитель высокого уровня Kafka.Как проверить потребительское состояние Kafka

public class KafkaHighLevelConsumer implements Runnable { 
    private final KafkaConsumer<String, String> consumer; 
    private final List<String> topics; 
    private final int id; 

    public KafkaHighLevelConsumer(int id, 
         String groupId, 
         List<String> topics,BlockingQueue<String> storyQueue) { 
     this.id = id; 
     this.topics = topics; 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "localhost:9091"); 
     props.put("group.id", groupId); 
     props.put("key.deserializer", StringDeserializer.class.getName()); 
     props.put("value.deserializer", StringDeserializer.class.getName()); 
     this.consumer = new KafkaConsumer<>(props); 
    } 

    @Override 
    public void run() { 
     try { 
      consumer.subscribe(topics); 

      while (true) { 
       ConsumerRecords<String, String> records = consumer.poll(100); 
       for (ConsumerRecord<String, String> record : records) { 
        Map<String, Object> data = new HashMap<>(); 
        data.put("partition", record.partition()); 
        data.put("offset", record.offset()); 
        data.put("value", record.value()); 
        System.out.println(this.id + ": " + data); 
       } 
      } 
     } catch (WakeupException e) { 
      // ignore for shutdown 
     }finally { 
      consumer.close(); 
     } 
    } 

    public void shutdown() { 
     consumer.wakeup(); 
    } 
} 

Потребитель хорошо работает, но мне нужно следить за состоянием потребителя. Почему у нас нет исключения, если неправильный IP-адрес сервера или порт?

Если я сменил порт на некорректный props.put("bootstrap.servers", "localhost:9091"); на props.put("bootstrap.servers", "localhost:100500"); Я до сих пор не могу получить исключения.

Я хотел бы знать, успешно ли я связан с Кафкой или нет! Можно ли обрабатывать такой случай?

Я использую такой Maven DEPS

<dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka-clients</artifactId> 
      <version>0.9.0.1</version> 
     </dependency> 

Спасибо!

+0

Вы можете использовать kafka-consumer-groups.sh, чтобы просмотреть тему потребителей и ее состояния. – MGolovanov

+0

Не исключение, так как клиент предполагает, что брокер может просто быть недоступен. Клиент будет подключаться к брокеру, если он будет подключен к Интернету. –

ответ

0

Согласно документации клиента (https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html):

Это будет прозрачно обрабатывать сбой серверов в кластере Кафки

библиотека не может сказать, разница между временным сбоем, постоянный неудача или неправильная конфигурация. Поэтому он считает, что каждый отказ «повторится», и делает именно это, навсегда навсегда, никогда не возвращая ошибки.

Это временное решение, чтобы проверить состояние соединения: Попросите какую-то информацию, которая гарантированно вернется, подождать некоторое разумное время и, если ничего не возвращает, то вы знаете, что соединение имеет некоторые проблемы:

int CONNECTION_TEST_TIMEOUT_SECONDS = 10; // or whatever is appropriate for your environment 

ExecutorService executor = Executors.newSingleThreadExecutor(); 
Runnable testTask = consumer::listTopics; 

Future future = executor.submit(testTask); 
try { 
    future.get(CONNECTION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); 
} catch (TimeoutException te) { 
    consumer.wakeup(); 
    throw new IOException("Could not communicate with the server within " + CONNECTION_TEST_TIMEOUT_SECONDS + " seconds"); 
} catch (InterruptedException e) { 
    // Nothing to do. Maybe a warning in the log? 
} catch (ExecutionException e) { 
    throw new IOException("Exception while running connection test: " + e.getMessage(), e); 
} 
Смежные вопросы