2015-06-17 2 views
20

Я новый студент, изучающий Кафку, и я столкнулся с некоторыми фундаментальными проблемами, понимая множество потребителей, что статьи, документы и т. Д. Пока не слишком помогли.Как использовать несколько пользователей в Kafka?

Одна вещь, которую я попытался сделать, это написать собственный производитель и потребитель высокого уровня Kafka и запустить их одновременно, опубликовав 100 простых сообщений для темы и получив мой потребитель. Мне удалось это сделать успешно, но когда я пытаюсь ввести второго потребителя, чтобы потреблять из той же темы, что и сообщения, которые были только что опубликованы, он не получает сообщений.

Насколько я понимаю, для каждой темы у вас могут быть потребители из отдельных групп потребителей, и каждая из этих групп потребителей получит полную копию сообщений, выпущенных для какой-либо темы. Это верно? Если нет, то каков был бы правильный способ настроить несколько потребителей? Это потребительский класс, который я написал до сих пор:

public class AlternateConsumer extends Thread { 
    private final KafkaConsumer<Integer, String> consumer; 
    private final String topic; 
    private final Boolean isAsync = false; 

    public AlternateConsumer(String topic, String consumerGroup) { 
     Properties properties = new Properties(); 
     properties.put("bootstrap.servers", "localhost:9092"); 
     properties.put("group.id", consumerGroup); 
     properties.put("partition.assignment.strategy", "roundrobin"); 
     properties.put("enable.auto.commit", "true"); 
     properties.put("auto.commit.interval.ms", "1000"); 
     properties.put("session.timeout.ms", "30000"); 
     properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); 
     properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     consumer = new KafkaConsumer<Integer, String>(properties); 
     consumer.subscribe(topic); 
     this.topic = topic; 
    } 


    public void run() { 
     while (true) { 
      ConsumerRecords<Integer, String> records = consumer.poll(0); 
      for (ConsumerRecord<Integer, String> record : records) { 
       System.out.println("We received message: " + record.value() + " from topic: " + record.topic()); 
      } 
     } 

    } 
} 

Кроме того, я заметил, что первоначально я тестировал выше потребление на тему «тест» только с одним разделом. Когда я добавил другого потребителя в существующую потребительскую группу, скажем, «testGroup», это вызвало перебалансировку Kafka, которая замедлила латентность моего потребления на значительную величину в размере секунд. Я думал, что это была проблема с перебалансировкой, поскольку у меня был только один раздел, но когда я создал новую тему «несколько разделов» с 6 разделами, возникли аналогичные проблемы, когда добавление большего количества потребителей в одну и ту же группу потребителей вызвало проблемы с задержкой. Я огляделся, и люди говорят мне, что я должен использовать многопоточного потребителя - может ли кто-нибудь пролить свет на это?

+0

Существует отличный пример потребителя высокого уровня [здесь] (https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example) для kafka '0.8.1'. – chrsblck

+0

@chrsblck Спасибо за ссылку.Я действительно изучил это ранее и, вероятно, не понял его так же хорошо, как мог, не могли бы вы немного объяснить, как этот пример использует потоки? Я не совсем понимаю, что они делают сейчас. –

+0

Один из способов состоит в том, чтобы иметь одинаковое количество потоков в качестве разделов для данной темы. Из статьи - Возьмите список потоков 'List > streams = consumerMap.get (topic);' ... Затем присвойте каждому потоку раздел 'executor.submit (новый ConsumerTest (поток, threadNumber)) '. – chrsblck

ответ

17

Я думаю, что ваша проблема кроется в свойстве auto.offset.reset. Когда новый потребитель читает из раздела и нет предыдущего зафиксированного смещения, свойство auto.offset.reset используется для определения того, что должно быть начальным смещением. Если вы установите его на «самый большой» (по умолчанию), вы начнете читать последнее (последнее) сообщение. Если вы установите его на «самый маленький», вы получите первое доступное сообщение.

Так добавить:

properties.put("auto.offset.reset", "smallest"); 

и повторите попытку.

+1

Это поздний ответ, но спасибо Крису! Ваши решения верны, и, посмотрев более подробно на некоторые документы, я должен был заметить, что при запуске нового потребителя он настроен на потребление только самых новых отправленных сообщений - НЕ ранее существовавших, если не установлены указанные выше свойства. –

4

В документации here говорится: «если вы предоставляете больше потоков, чем есть разделы по теме, некоторые потоки никогда не будут видеть сообщение». Можете ли вы добавить разделы в свою тему? У меня есть счетчик по количеству пользователей, равный количеству разделов в моей теме, и каждый поток получает сообщения.

Вот моя тема конфигурация:

buffalo-macbook10:kafka_2.10-0.8.2.1 aakture$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic recent-wins 
Topic:recent-wins PartitionCount:3 ReplicationFactor:1 Configs: 
Topic: recent-wins Partition: 0 Leader: 0 Replicas: 0 Isr: 0 
Topic: recent-wins Partition: 1 Leader: 0 Replicas: 0 Isr: 0 
Topic: recent-wins Partition: 2 Leader: 0 Replicas: 0 Isr: 0 

И мой потребитель:

package com.cie.dispatcher.services; 

import com.cie.dispatcher.model.WinNotification; 
import com.fasterxml.jackson.databind.ObjectMapper; 
import com.google.inject.Inject; 
import io.dropwizard.lifecycle.Managed; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

/** 
* This will create three threads, assign them to a "group" and listen for notifications on a topic. 
* Current setup is to have three partitions in Kafka, so we need a thread per partition (as recommended by 
* the kafka folks). This implements the dropwizard Managed interface, so it can be started and stopped by the 
* lifecycle manager in dropwizard. 
* <p/> 
* Created by aakture on 6/15/15. 
*/ 
public class KafkaTopicListener implements Managed { 
private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicListener.class); 
private final ConsumerConnector consumer; 
private final String topic; 
private ExecutorService executor; 
private int threadCount; 
private WinNotificationWorkflow winNotificationWorkflow; 
private ObjectMapper objectMapper; 

@Inject 
public KafkaTopicListener(String a_zookeeper, 
          String a_groupId, String a_topic, 
          int threadCount, 
          WinNotificationWorkflow winNotificationWorkflow, 
          ObjectMapper objectMapper) { 
    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
      createConsumerConfig(a_zookeeper, a_groupId)); 
    this.topic = a_topic; 
    this.threadCount = threadCount; 
    this.winNotificationWorkflow = winNotificationWorkflow; 
    this.objectMapper = objectMapper; 
} 

/** 
* Creates the config for a connection 
* 
* @param zookeeper the host:port for zookeeper, "localhost:2181" for example. 
* @param groupId the group id to use for the consumer group. Can be anything, it's used by kafka to organize the consumer threads. 
* @return the config props 
*/ 
private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) { 
    Properties props = new Properties(); 
    props.put("zookeeper.connect", zookeeper); 
    props.put("group.id", groupId); 
    props.put("zookeeper.session.timeout.ms", "400"); 
    props.put("zookeeper.sync.time.ms", "200"); 
    props.put("auto.commit.interval.ms", "1000"); 

    return new ConsumerConfig(props); 
} 

public void stop() { 
    if (consumer != null) consumer.shutdown(); 
    if (executor != null) executor.shutdown(); 
    try { 
     if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { 
      LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); 
     } 
    } catch (InterruptedException e) { 
     LOG.info("Interrupted during shutdown, exiting uncleanly"); 
    } 
    LOG.info("{} shutdown successfully", this.getClass().getName()); 
} 
/** 
* Starts the listener 
*/ 
public void start() { 
    Map<String, Integer> topicCountMap = new HashMap<>(); 
    topicCountMap.put(topic, new Integer(threadCount)); 
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 
    executor = Executors.newFixedThreadPool(threadCount); 
    int threadNumber = 0; 
    for (final KafkaStream stream : streams) { 
     executor.submit(new ListenerThread(stream, threadNumber)); 
     threadNumber++; 
    } 
} 

private class ListenerThread implements Runnable { 
    private KafkaStream m_stream; 
    private int m_threadNumber; 

    public ListenerThread(KafkaStream a_stream, int a_threadNumber) { 
     m_threadNumber = a_threadNumber; 
     m_stream = a_stream; 
    } 

    public void run() { 
     try { 
      String message = null; 
      LOG.info("started listener thread: {}", m_threadNumber); 
      ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); 
      while (it.hasNext()) { 
       try { 
        message = new String(it.next().message()); 
        LOG.info("receive message by " + m_threadNumber + " : " + message); 
        WinNotification winNotification = objectMapper.readValue(message, WinNotification.class); 
        winNotificationWorkflow.process(winNotification); 
       } catch (Exception ex) { 
        LOG.error("error processing queue for message: " + message, ex); 
       } 
      } 
      LOG.info("Shutting down listener thread: " + m_threadNumber); 
     } catch (Exception ex) { 
      LOG.error("error:", ex); 
     } 
    } 
    } 
} 
+0

Можете ли вы поделиться примером для версии Kafka 1.0, так как большинство классов, используемых в приведенном выше примере, устарели. –

+0

Я не верю, что это было в то время, я не могу обойтись, чтобы обновить свой код очень скоро, извинения. –

4

Если вы хотите использовать несколько потребителей потреблять такое же сообщения (как трансляция), вы можете породить их с различными группами потребителей а также установив auto.offset.reset в наименьший в потребительской конфигурации. Если вы хотите, чтобы несколько потребителей закончили параллельное использование (разделите работу между ними), вы должны создать количество разделов> = количество потребителей. Один раздел может потребляться не более чем одним потребительским процессом. Но один потребитель может потреблять более одного раздела.