2015-03-11 2 views
1

Я совершенно новичок в Kafka и пытаюсь запустить примеры потребителей на https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example, но он не получает сообщений.Потребитель Kafka не получает сообщения при запуске примера на kafka.apache.org

Вот выход в консоли Eclipse:

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties). 
log4j:WARN Please initialize the log4j system properly. 
Shutting down Thread: 2 
Shutting down Thread: 0 
Shutting down Thread: 1 

и ниже мой код для потребителя

public class ConsumerDemo { 
private final ConsumerConnector consumer; //why private final 
private final String topic; 
private ExecutorService executor; 

public ConsumerDemo(String a_zookeeper,String a_groupId,String a_topic) 
{ 
    consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId)); 
    this.topic=a_topic; 
} 

public void shutdown() 
{ 
    if(consumer != null) 
     consumer.shutdown(); 
    if(executor != null) 
     executor.shutdown(); 
} 

public void run(int numThreads) 
{ 
    Map<String,Integer> topicCountMap= new HashMap<String,Integer>(); 
    topicCountMap.put(topic, new Integer(numThreads)); 
    Map<String,List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
    List<KafkaStream<byte[],byte[]>> streams = consumerMap.get(topic); 
    int m=streams.size(); 

    executor = Executors.newFixedThreadPool(numThreads); 

    int threadNumber=0; 
    for(final KafkaStream stream : streams) 
    { 
     executor.submit(new ConsumerMsgTask(stream,threadNumber)); 
     threadNumber++; 
    } 
} 

private static ConsumerConfig createConsumerConfig(String a_zookeeper,String a_groupId) 
{ 
    Properties props = new Properties(); 
    props.put("zookeeper.connect", a_zookeeper); 
    props.put("group.id", a_groupId); 
    props.put("auto.offset.reset", "smallest"); 
    props.put("zookeeper.session.timeout.ms", "4000"); 
    props.put("zookeeper.sync.time.ms", "200"); 
    props.put("auto.commit.interval.ms", "1000"); 

    return new ConsumerConfig(props); 
} 

public static void main(String[] arg) 
{ 
    String[] args = {"192.168.0.123:2181","group-a","test1","3"}; 
    String zooKeeper = args[0]; 
    String groupId = args[1]; 
    String topic = args[2]; 
    int threads = Integer.parseInt(args[3]); 

    ConsumerDemo demo = new ConsumerDemo(zooKeeper,groupId,topic); 
    demo.run(threads); 

    try 
    { 
     Thread.sleep(10000); 
    }catch (InterruptedException ie) 
    { 

    } 
    demo.shutdown(); 

} 

и это ConsumerMsgTask

public class ConsumerMsgTask implements Runnable { 
private KafkaStream<byte[], byte[]> m_stream; 
private int m_threadNumber; 

public ConsumerMsgTask(KafkaStream<byte[], byte[]> stream,int threadNumber) 
{ 
    m_threadNumber = threadNumber; 
    m_stream = stream; 
} 

public void run() 
{ 
    ConsumerIterator<byte[],byte[]> it = m_stream.iterator(); 
    int i=it.size(); 
    while(it.hasNext()) 
     System.out.println("Thread "+m_threadNumber+": "+ new String(it.next().message())); 
    System.out.println("Shutting down Thread: " + m_threadNumber); 
} 

и это мой ProducerDemo

public class ProducerDemo { 
public static void main(String[] args) 
{ 
    Random rnd= new Random(); 
    int events=100; 

    Properties props= new Properties(); 
    props.put("metadata.broker.list", "192.168.0.123:9092,192.168.0.123:9093,192.168.0.123:9094"); 
    props.put("serializer.class", "kafka.serializer.StringEncoder"); 
    props.put("request.required.acks", "1"); 

    ProducerConfig config = new ProducerConfig(props); 

    Producer<String,String> producer=new Producer<String,String>(config); 
    long start=System.currentTimeMillis(); 
    for(int i=0;i<events;i++) 
    { 
     long runtime=new Date().getTime(); 
     String ip="192.168.5."+rnd.nextInt(255); 
     String msgs=runtime+",www.maodou.com,"+ip; 
     KeyedMessage<String,String> data=new KeyedMessage<String,String>("test1",ip,msgs); 
     producer.send(data); 
    } 
    System.out.println("time:"+(System.currentTimeMillis()-start)); 
    producer.close(); 

} 

}

Я создал тему 'test1' по команде ниже

$ bin/kafka-topics.sh --create --zookeeper 192.168.0.123:2181 --replication-factor 3 --partitions 3 --topic test1 

Это использование Кафка 0.8.2 работает на CentOS версии 6.5 (Final), с помощью OpenJDK «1.7. 0_45" .

+0

Вы пытались использовать из командной строки? Если вы не попробуете это и скажите мне, потреблял ли он какие-либо сообщения. bin/kafka-console-consumer.sh --zookeeper 192.168.0.123:2181 --topic test1 - от начала –

+0

Я пробовал и потреблял 100 сообщений. – April

ответ

0

У вас есть код в вашем основном()? Если это так, его удаление решит проблему. Мы сталкиваемся с той же проблемой, что и основной поток пытается отключить пользователя через 10 секунд. Возможно, вам придется реализовывать изящное завершение/запуск с использованием Apache Commons Cli и Apache Commons Daemon.

try { 
    Thread.sleep(10000); 
} catch (InterruptedException ie) { 

} 
example.shutdown(); 
Смежные вопросы