2015-03-25 2 views
1

Я пытаюсь использовать простой потребительский клиент-производитель kafka api, мой класс продюсера отлично работает, поскольку я могу видеть сообщение у потребителя с консоли, но когда я запускаю потребительский код, ничего не отображается, i я не получаю то, что проблема или где я делаю ошибкуКод пользователя kafka не работает полностью

это код производитель -

package com.app.test; 
import java.util.Properties; 
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

public class SimpleProducer { 
    private static Producer<Integer, String> producer; 
    private final Properties props = new Properties();public SimpleProducer() 
    { 
     props.put("metadata.broker.list", "localhost:9092"); 
     props.put("serializer.class", "kafka.serializer.StringEncoder"); 
     props.put("request.required.acks", "1"); 
     producer = new Producer<Integer, String>(new ProducerConfig(props)); 
    } 
public static void main(String[] args) { 
    SimpleProducer sp = new SimpleProducer(); 
    String topic = "test4"; 
    String messageStr = "hello"; 
    KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr); 
    System.out.println("producer : "+producer); 
    producer.send(data); 
    producer.close(); 
    } 
} 

потребительского класса -

package com.app.test; 

import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 

import kafka.consumer.Consumer; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 

public class SimpleHLConsumer { 

    private final ConsumerConnector consumer; 
    private final String topic; 

    public SimpleHLConsumer(String zookeeper, String groupId, String topic) { 
     Properties props = new Properties(); 
    props.put("zookeeper.connect", zookeeper); 
    props.put("group.id", groupId); 
    props.put("zookeeper.session.timeout.ms", "500"); 
    props.put("zookeeper.sync.time.ms", "250"); 
    props.put("auto.commit.interval.ms", "1000"); 

    consumer = Consumer.createJavaConsumerConnector(
    new ConsumerConfig(props)); 
    this.topic = topic; 
    } 



    public void testConsumer() { 


    Map<String, Integer> topicCount = new HashMap<String, Integer>(); 
     // Define single thread for topic 
    topicCount.put(topic, new Integer(1));  

    System.out.println("check1"); 

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); 

    System.out.println("check2"); 
    List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); 

    for (KafkaStream stream : streams) { 
     System.out.println("test----"); 
     System.out.println("test----"+stream.toString()); 
     ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator(); 
     while (consumerIte.hasNext()) { 
      try { 
       System.out.println("Message from Single Topic: " + new String(consumerIte.next().message(), "UTF-8")); 
      } catch (UnsupportedEncodingException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 
    } 
    if (consumer != null) 
     consumer.shutdown(); 
    } 



    public static void main(String[] args) { 

    String topic = "test4"; 
SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("localhost:2181", "testgroup", topic); 
    simpleHLConsumer.testConsumer(); 
    } 
} 

для с hecking, я применил 2 проверки с помощью sysout в методе testConsumer(), поэтому пока работает только check1, т.е. код не достигает check2, я думаю, что есть некоторая проблема с consumer.createMessageStreams(topicCount);, так что в чем причина и как я могу ее решить ?

+0

Комментарии не предназначены для расширенного обсуждения; этот разговор был [перемещен в чат] (http://chat.stackoverflow.com/rooms/73851/discussion-on-question-by-nsu-kafka-consumer-code-is-not-running-completely). – Taryn

ответ

1

В коде нет никаких проблем.

Просто создайте банку, в том числе kafka-version (версия kafka, которую вы используете) в своей банке, и запустите ее и попробуйте отправить сообщение с консоли производителя.

Надеюсь, это может вам помочь.

Смежные вопросы