Я новый студент, изучающий Кафку, и я столкнулся с некоторыми фундаментальными проблемами, понимая множество потребителей, что статьи, документы и т. Д. Пока не слишком помогли.Как использовать несколько пользователей в Kafka?
Одна вещь, которую я попытался сделать, это написать собственный производитель и потребитель высокого уровня Kafka и запустить их одновременно, опубликовав 100 простых сообщений для темы и получив мой потребитель. Мне удалось это сделать успешно, но когда я пытаюсь ввести второго потребителя, чтобы потреблять из той же темы, что и сообщения, которые были только что опубликованы, он не получает сообщений.
Насколько я понимаю, для каждой темы у вас могут быть потребители из отдельных групп потребителей, и каждая из этих групп потребителей получит полную копию сообщений, выпущенных для какой-либо темы. Это верно? Если нет, то каков был бы правильный способ настроить несколько потребителей? Это потребительский класс, который я написал до сих пор:
public class AlternateConsumer extends Thread {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
private final Boolean isAsync = false;
public AlternateConsumer(String topic, String consumerGroup) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", consumerGroup);
properties.put("partition.assignment.strategy", "roundrobin");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<Integer, String>(properties);
consumer.subscribe(topic);
this.topic = topic;
}
public void run() {
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(0);
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
}
}
}
}
Кроме того, я заметил, что первоначально я тестировал выше потребление на тему «тест» только с одним разделом. Когда я добавил другого потребителя в существующую потребительскую группу, скажем, «testGroup», это вызвало перебалансировку Kafka, которая замедлила латентность моего потребления на значительную величину в размере секунд. Я думал, что это была проблема с перебалансировкой, поскольку у меня был только один раздел, но когда я создал новую тему «несколько разделов» с 6 разделами, возникли аналогичные проблемы, когда добавление большего количества потребителей в одну и ту же группу потребителей вызвало проблемы с задержкой. Я огляделся, и люди говорят мне, что я должен использовать многопоточного потребителя - может ли кто-нибудь пролить свет на это?
Существует отличный пример потребителя высокого уровня [здесь] (https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example) для kafka '0.8.1'. – chrsblck
@chrsblck Спасибо за ссылку.Я действительно изучил это ранее и, вероятно, не понял его так же хорошо, как мог, не могли бы вы немного объяснить, как этот пример использует потоки? Я не совсем понимаю, что они делают сейчас. –
Один из способов состоит в том, чтобы иметь одинаковое количество потоков в качестве разделов для данной темы. Из статьи - Возьмите список потоков 'List> streams = consumerMap.get (topic);' ... Затем присвойте каждому потоку раздел 'executor.submit (новый ConsumerTest (поток, threadNumber)) '. –
chrsblck