2015-10-12 4 views
2

Я пытался интегрировать кафка-шторм. Я только что начал с нескольких примеров.Написание темы кафки, когда потребитель не работает

Мне удалось запустить примеры из GitHub. Затем я пытаюсь написать класс Producer в eclipse для публикации сообщений в теме kafka с использованием API KAFKA PRODUCER.

Scenario1:

Когда мой потребитель-оболочка работает с использованием сказать тему тест, и тест я запустить мой класс производителя. Я могу видеть свою потребительскую оболочку со всеми опубликованными сообщениями.

Scenario2

Я еще не начал свою потребительскую-оболочку (скажем, потребитель вниз). И я запускаю класс продюсера. Сообщения публикуются в кафке.

Теперь, если сообщения опубликованы и теперь после простоя, если я запускаю свою потребительскую оболочку, она не читает уже опубликованные темы.

Почему? Я полагаю, что он поддерживает журнал для потребления темы. Не следует ли читать сообщения?

Есть ли какой-либо параметр конфигурации, который я должен упомянуть?

Properties props = new Properties(); 
props.put("metadata.broker.list", "localhost:9092"); 
props.put("zk.connect", "localhost:2181"); 
props.put("serializer.class", "kafka.serializer.StringEncoder"); 
props.put("request.required.acks", "1"); 

ProducerConfig config = new ProducerConfig(props); 
Producer<String, String> producer = new Producer<String, String>(config); 

     for (int nEvents=0; nEvents<events;nEvents++) 
     { 
      String ip="192.168.2."+rnd.nextInt(255); 
      String msg=getNextTradeData(); // Class to generate data 
      KeyedMessage<String,String> data=new KeyedMessage<String, String>("TradeFrequency",ip,msg); 
      Thread.sleep(100); 
      System.out.println(msg); 
      producer.send(data); 

     } 
producer.close(); 

} 

Или есть что-то, что мне нужно для изменения потребителя. Я использую потребительскую-оболочку, представленную в пакете, и запустить его с помощью

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic first-topic 

ответ

1

При запуске kafka-console-consume он будет считывать из тока смещения. Это, смещение NOW(), а не из прошлого.

Чтобы узнать, были опубликованы сообщения, у вас есть два варианта:

  1. Используйте --from-beginning опцию, чтобы читать с самого начала темы

    бен/kafka-console-consumer.sh - -zookeeper локальные: 2181 --topic первой-тема --from-начало

  2. Упорство состояния консоли-потребитель в Zookeeper/Кафках с помощью --consumer.config опции

    бен/kafka-console-consumer.sh --zookeeper локальный: 2181 --topic первой темы --consumer.config /home/sql-injection/consumer-config.txt

согласно это nice page параметры, которые вам нужны для конфигурации пользователя: consumer.id, client.id.

+0

Спасибо, его работы для меня в этом сценарии. Теперь в этом случае каждый раз, когда он начнется с присвоения журнала, я полагаю (хотя мне нужно еще проверить). Но если я хочу прочитать, где бы он ни находился, или может быть сказано (последнее смещение читать + 1). Каким будет мой подход? – NishantM

+0

@NishantM использует опцию конфигурации потребителя, которая должна позволить вам возобновить работу с последнего смещения –