2017-02-03 12 views
1

Это потребитель для Apache Kafka и он не получает сообщения из темы «тест»Как подписаться на тему в apache kafka из приложения java?

package com.kafka; 

import java.util.Arrays; 
import java.util.Properties; 

import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 

public class ConsumerTest { 

    public static void main(String[] args) { 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "172.17.210.45:9092"); 
    props.put("zookeeper.connect", "172.17.210.45:2181"); 
    props.put("group.id", "test-consumer-group"); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("auto.offset.reset", "earliest"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props); 
    System.out.println("properties loaded"); 
    kafkaConsumer.subscribe(Arrays.asList("test")); 

    while (true) { 
     ConsumerRecords<String, String> records = kafkaConsumer.poll(100); 
     for (ConsumerRecord<String, String> record : records) { 
      System.out.printf("offset = %d, value = %s", record.offset(), record.value()); 
      System.out.println(); 
     } 
    } 

    } 
} 

В этом результате я не получает никаких сообщений от Апача Кафки.

 log4j:WARN No appenders could be found for logger                            (org.apach e.kafka.clients.consumer.ConsumerConfig). 
    log4j:WARN Please initialize the log4j system properly. 
properties loaded 
+0

Вы нашли то, что не так? – freedev

+0

Вы производите сообщения во время работы потребителя? – oh54

+0

Возможный дубликат [Как создать тему в Kafka через Java] (http://stackoverflow.com/questions/27036923/how-to-create-a-topic-in-kafka-through-java) – Teja

ответ

0

Ваш код кажется правильным. Я предлагаю контролировать, доступен ли ip 172.17.210.45.

ping 172.17.210.45 

и

telnet 172.17.210.45 9092 
telnet 172.17.210.45 2181 

Проверьте существующие темы на сервере

bin/kafka-topics.sh --list --zookeeper 172.17.210.45:2181 

Тогда вы могли бы попытаться переместить потребителя к началу (эта строка должна быть добавлена ​​после kafkaConsumer.subscribe:

kafkaConsumer.seekToBeginning(Collections.emptyList()); 

Наконец-то я предлагаю добавить несколько строк в бесконечном цикле, после kafkaConsumer.poll(100). Просто чтобы посмотреть, не зависает ли запись или что еще.

UPDATE

Если у вас есть один или несколько group.id в продюсерской части, вы должны использовать один из них в потребительской части.

+0

@freedev ... Если я добавлю kafkaConsumer.seekToBeginning (Collections.emptyList()); Это бросает ошибку. – Teja

+0

Здесь я получаю список тем, но не получаю сообщения по определенной теме ... – Teja

+0

Если вы читаете с темы "test", я предполагаю, что тема существует. Я также предлагаю обязательно использовать тот же идентификатор группы, который используется частью производителя. – freedev

Смежные вопросы