Я пытался интегрировать кафка-шторм. Я только что начал с нескольких примеров.Написание темы кафки, когда потребитель не работает
Мне удалось запустить примеры из GitHub. Затем я пытаюсь написать класс Producer в eclipse для публикации сообщений в теме kafka с использованием API KAFKA PRODUCER.
Scenario1:
Когда мой потребитель-оболочка работает с использованием сказать тему тест, и тест я запустить мой класс производителя. Я могу видеть свою потребительскую оболочку со всеми опубликованными сообщениями.
Scenario2
Я еще не начал свою потребительскую-оболочку (скажем, потребитель вниз). И я запускаю класс продюсера. Сообщения публикуются в кафке.
Теперь, если сообщения опубликованы и теперь после простоя, если я запускаю свою потребительскую оболочку, она не читает уже опубликованные темы.
Почему? Я полагаю, что он поддерживает журнал для потребления темы. Не следует ли читать сообщения?
Есть ли какой-либо параметр конфигурации, который я должен упомянуть?
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("zk.connect", "localhost:2181");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (int nEvents=0; nEvents<events;nEvents++)
{
String ip="192.168.2."+rnd.nextInt(255);
String msg=getNextTradeData(); // Class to generate data
KeyedMessage<String,String> data=new KeyedMessage<String, String>("TradeFrequency",ip,msg);
Thread.sleep(100);
System.out.println(msg);
producer.send(data);
}
producer.close();
}
Или есть что-то, что мне нужно для изменения потребителя. Я использую потребительскую-оболочку, представленную в пакете, и запустить его с помощью
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic first-topic
Спасибо, его работы для меня в этом сценарии. Теперь в этом случае каждый раз, когда он начнется с присвоения журнала, я полагаю (хотя мне нужно еще проверить). Но если я хочу прочитать, где бы он ни находился, или может быть сказано (последнее смещение читать + 1). Каким будет мой подход? – NishantM
@NishantM использует опцию конфигурации потребителя, которая должна позволить вам возобновить работу с последнего смещения –