В моем случае это, как представляется, влияет на функциональность, поскольку я не могу использовать сообщения. См. Код ниже
Vertx instance = VertxConfig.getInstance();
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// consumerConfig.put (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "ранний"); // consumerConfig.put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
Properties producerConfig = new Properties();
producerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerConfig.put("acks", "1");
String topic = "dstv-queue-3";
consumer = KafkaConsumer.create(instance, consumerConfig);
producer = KafkaProducer.create(instance, producerConfig, String.class, String.class);
consumer.subscribe(topic);
instance.setPeriodic(2000, worker -> {
KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(topic, "message");
producer.write(record, writeHandler -> {
RecordMetadata metadata = writeHandler.result();
//if meta data returned..
if (metadata != null) {
long offset = metadata.getOffset();
int partition = metadata.getPartition();
System.out.println("completed write: " + (writeHandler.succeeded() ? "successful" : "failed") + " offset:" + offset + " partition: " + partition);
}
});
});
AtomicLong counter = new AtomicLong();
consumer.handler(readHandler -> System.out.println(counter.getAndAdd(1) + ". " + readHandler.value() + " was received"));
Похоже, вы пытаетесь запустить другой брокер kafka с таким же идентификатором брокера –
Спасибо. Это очень странно. Я следую той же процедуре других времен. Что я могу попытаться решить проблему? – adellarocca
Попробуйте перезапустить zookeeper, за которым следует kafka один раз –