2016-09-22 2 views
3

У меня есть один Kafka-Broker с несколькими темами, каждый из которых имеет один раздел.Kafka Consumer с JAVA

У меня есть потребитель, который работает просто отлично потребляя сообщения от темы

Моя проблема мне нужно улучшить с помощью пут очереди сообщений за счет увеличения количества разделов, скажем, у меня есть четыре разделов на тема, есть способ, которым я могу написать четыре потребителя, каждый из которых указал на отдельный раздел на тему ???

import java.util.*; 
import kafka.consumer.Consumer; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 

public class KafkaConsumer { 
    private ConsumerConnector consumerConnector = null; 
    private final String topic = "mytopic"; 

    public void initialize() { 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", "localhost:2181"); 
     props.put("group.id", "testgroup"); 
     props.put("zookeeper.session.timeout.ms", "400"); 
     props.put("zookeeper.sync.time.ms", "300"); 
     props.put("auto.commit.interval.ms", "1000"); 
     ConsumerConfig conConfig = new ConsumerConfig(props); 
     consumerConnector = Consumer.createJavaConsumerConnector(conConfig); 
    } 

    public void consume() { 
     //Key = topic name, Value = No. of threads for topic 
     Map<String, Integer> topicCount = new HashMap<String, Integer>();  
     topicCount.put(topic, new Integer(1)); 

     //ConsumerConnector creates the message stream for each topic 
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = 
       consumerConnector.createMessageStreams(topicCount);   

     // Get Kafka stream for topic 'mytopic' 
     List<KafkaStream<byte[], byte[]>> kStreamList = 
               consumerStreams.get(topic); 
     // Iterate stream using ConsumerIterator 
     for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) { 
       ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator(); 

       while (consumerIte.hasNext()) 
         System.out.println("Message consumed from topic 
            [" + topic + "] : "  + 
             new String(consumerIte.next().message()));    
     } 
     //Shutdown the consumer connector 
     if (consumerConnector != null) consumerConnector.shutdown();   
    } 

    public static void main(String[] args) throws InterruptedException { 
     KafkaConsumer kafkaConsumer = new KafkaConsumer(); 
     // Configure Kafka consumer 
     kafkaConsumer.initialize(); 
     // Start consumption 
     kafkaConsumer.consume(); 
    } 

}

ответ

1

По существу, все, что вам нужно сделать, это запустить несколько потребителей, которые все в одной и той же группы потребителей. Если вы используете нового потребителя от kafka 0.9 или более поздней версии, или если вы используете потребителя высокого уровня, kafka позаботится о разделении разделов, следя за тем, чтобы каждый раздел читался одним потребителем. Если у вас больше разделов, чем у потребителей, некоторые потребители получат сообщения из нескольких разделов, но ни один раздел никогда не будет прочитан более чем одним потребителем из той же группы потребителей, чтобы сообщения не дублировались. Таким образом, вы никогда не хотите больше потребителей, чем разделов, поскольку некоторые потребители будут бездействовать. Вы можете также точно настроить, какой потребитель читает каждую секцию, используя простого пользователя https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

Кажется, вы используете старого потребителя от Kafka 0.8 или раньше. Вы можете рассмотреть возможность перехода на нового потребителя. http://kafka.apache.org/documentation.html#intro_consumers

Вот еще одна хорошая статья с подробными примерами написания потребителей с использованием нового потребителя: http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

Смежные вопросы