2013-08-27 2 views
9

Я только что начал работать с бета-версией Kafka 0.8 1. У меня есть очень простой пример, и проблема в том, что я могу получить только один потребитель сообщения, а не несколько. То есть метод runSingleWorker() WORKS. Метод запуска() не работает:Kafka: Не могу создать несколько потоков Потребители

import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.consumer.ConsumerConfig; 
import kafka.javaapi.consumer.ConsumerConnector; 

import java.util.Map; 
import java.util.List; 
import java.util.HashMap; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ExecutorService; 

import org.springframework.context.ApplicationContext; 
import org.springframework.context.annotation.AnnotationConfigApplicationContext; 

import com.truecar.inventory.worker.core.application.config.AppConfig; 

public class ConsumerThreadPool { 

    private final ConsumerConnector consumer; 
    private final String topic; 

    private ExecutorService executor; 
    private static ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class); 

    public ConsumerThreadPool(String topic) { 
     consumer = kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)context.getBean("consumerConfig")); 
     this.topic = topic; 
    } 

    public void shutdown() { 
     if (consumer != null) consumer.shutdown(); 
     if (executor != null) executor.shutdown(); 
    } 

    public void run(Integer numThreads) { 
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 

     topicCountMap.put(topic, numThreads); 
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
     List<KafkaStream<byte[], byte[]>> topicListeners = consumerMap.get(topic); 

     executor = Executors.newFixedThreadPool(numThreads); 

     for(Integer i = 0; i < numThreads; i++){ 
      KafkaStream<byte[], byte[]> stream = topicListeners.get(i); 
      executor.submit(new Consumer(stream, i)); 
     } 
    } 


    public void runSingleWorker(Integer numThreads) { 
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 

     topicCountMap.put(topic, new Integer(1)); 

     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 

     KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); 
     ConsumerIterator<byte[], byte[]> it = stream.iterator(); 
     while(true) { 
      try { 
       Thread.sleep(1000); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      while(it.hasNext()){ 
       System.out.println(new String(it.next().message())); 

      } 
     } 
    } 
} 

А внутри моего игрушечного потребителя:

import kafka.consumer.KafkaStream; 
import kafka.consumer.ConsumerIterator; 

public class Consumer implements Runnable { 

    private KafkaStream kafkaStream; 
    private Integer threadNumber; 

    public Consumer(KafkaStream kafkaStream, Integer threadNumber) { 
     this.threadNumber = threadNumber; 
     this.kafkaStream = kafkaStream; 
    } 

    public void run() { 
     ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator(); 
     System.out.println("Created iterator " + it.toString() + " thread number " + threadNumber); 
     while(true) { 

      try { 
       Thread.sleep(1000); 
      } catch (InterruptedException e) { 
       break; 
      } 

      while(it.hasNext()) { 
       System.out.println("Thread " + threadNumber + ": " + new String(it.next().message())); 
      } 
     } 
     System.out.println("Shutting down Thread: " + threadNumber); 
    } 
} 

Проблема заключается в том, пул рабочих не подхватывает сообщения:

Created iterator empty iterator thread number 3 
Created iterator empty iterator thread number 6 
Created iterator empty iterator thread number 9 
Created iterator empty iterator thread number 7 
Created iterator empty iterator thread number 0 
Created iterator empty iterator thread number 0 
Created iterator empty iterator thread number 8 
Created iterator empty iterator thread number 3 
etc... 

Когда я добавляю сообщения через командную строку продукта, сообщения печатаются в однопоточной рабочей версии, но сообщения не печатаются в многопотоковой ситуации. Что тут происходит? Как я могу это исправить?

Btw, pom.xml для kafka 0.8 не является действительным pom и не будет получать зависимости, поэтому здесь находится pom с полными зависимостями.

<?xml version="1.0" encoding="UTF-8"?> 
<project 
xmlns="http://maven.apache.org/POM/4.0.0" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation=" 
    http://maven.apache.org/POM/4.0.0 
    http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
<modelVersion>4.0.0</modelVersion> 
<groupId>group1</groupId> 
<artifactId>artifact1</artifactId> 
<version>0.1.0</version> 
<packaging>jar</packaging> 
<properties> 
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
    <org.springframework.version>3.2.4.RELEASE</org.springframework.version> 
</properties> 
<dependencies> 
    <dependency> 
     <groupId>org.springframework</groupId> 
     <artifactId>spring-core</artifactId> 
     <version>3.2.4.RELEASE</version> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework</groupId> 
     <artifactId>spring-context</artifactId> 
     <version>3.2.4.RELEASE</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.9.2</artifactId> 
     <version>0.8.0-beta1</version> 
    </dependency> 
    <dependency> 
     <groupId>javax.inject</groupId> 
     <artifactId>javax.inject</artifactId> 
     <version>1</version> 
    </dependency> 
    <dependency> 
     <groupId>org.scala-lang</groupId> 
     <artifactId>scala-library</artifactId> 
     <version>2.9.2</version> 
    </dependency> 
    <dependency> 
     <groupId>log4j</groupId> 
     <artifactId>log4j</artifactId> 
     <version>1.2.17</version> 
    </dependency> 
    <dependency> 
     <groupId>com.101tec</groupId> 
     <artifactId>zkclient</artifactId> 
     <version>0.3</version> 
    </dependency> 
    <dependency> 
     <groupId>com.yammer.metrics</groupId> 
     <artifactId>metrics-core</artifactId> 
     <version>2.2.0</version> 
    </dependency> 
</dependencies> 
<build> 
    <finalName>inventory-core</finalName> 
    <plugins> 
     <plugin> 
      <groupId>org.apache.maven.plugins</groupId> 
      <artifactId>maven-compiler-plugin</artifactId> 
      <version>3.0</version> 
      <configuration> 
       <source>1.7</source> 
       <target>1.7</target> 
      </configuration> 
     </plugin> 
     <plugin> 
      <groupId>org.apache.maven.plugins</groupId> 
      <artifactId>maven-jar-plugin</artifactId> 
      <configuration> 
       <archive> 
        <manifest> 
         <mainClass>com.truecar.inventory.worker.core.application.Starter</mainClass> 
        </manifest> 
       </archive> 
      </configuration> 
     </plugin> 
     <plugin> 
      <groupId>org.dstovall</groupId> 
      <artifactId>onejar-maven-plugin</artifactId> 
      <version>1.4.4</version> 
      <executions> 
       <execution> 
        <configuration> 
         <onejarVersion>0.97</onejarVersion> 
         <classifier>onejar</classifier> 
        </configuration> 
        <goals> 
         <goal>one-jar</goal> 
        </goals> 
       </execution> 
      </executions> 
     </plugin> 
    </plugins> 
</build> 
<pluginRepositories> 
    <pluginRepository> 
     <id>onejar-maven-plugin.googlecode.com</id> 
     <url>http://onejar-maven-plugin.googlecode.com/svn/mavenrepo</url> 
    </pluginRepository> 
</pluginRepositories> 
</project> 
+0

Сколько у вас разделов? Если у вас есть только раздел '1' для темы .. вы не можете иметь несколько потоков для чтения из того же раздела. Если вы хотите больше рабочих потоков, вам потребуется больше разделов .. –

ответ

1

Возможно, слишком поздно для опроса, но может быть полезным для других разработчиков. Кажется, вы использовали только один раздел для пары Потребителей - это неправильно. Цитата Documentation:

Поскольку существует несколько разделов это по-прежнему балансирует нагрузку в течение многих случаев потребления. Обратите внимание, однако, что не может быть больше экземпляров потребителей, чем разделы.

Итак, когда вы думаете о Потребителях, вам следует подумать, как разделить сообщения по разделам. В большинстве случаев вам следует использовать некоторую группировку высокого уровня или даже позволить ему быть случайным по умолчанию.

Смежные вопросы