2014-02-12 3 views
15

Я использую Kafka 0.8.0 и пытаюсь выполнить описанный ниже сценарий.Задержка в сообщениях потребителей в Apache Kafka

JCA API (Действует как производитель и посылают данные) -----> Потребительские ------> HBase

Я посылаю каждое сообщение к потребителю, как только я принести данных с использованием JCA Client. Например, как только производитель отправляет сообщение №1, я хочу получить его от пользователя и «положить» в HBase. Но мой потребитель начинает получать сообщения после некоторых случайных n сообщений. Я хочу синхронизировать производителя и потребителя, чтобы они оба начали работать вместе.

Я использовал:

1 брокер

1 сингл тему

1 одного производителя и потребителя высокий уровень

Может кто-нибудь предложить, что мне нужно сделать сделать для достижения того же?

EDITED:

Добавление некоторых соответствующий фрагмент кода.

Consumer.java

public class Consumer extends Thread { 
    private final ConsumerConnector consumer; 
    private final String topic; 
    PrintWriter pw = null; 
    int t = 0; 
    StringDecoder kd = new StringDecoder(null); 
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
    Map<String, List<KafkaStream<String, Signal>>> consumerMap; 
    KafkaStream<String, Signal> stream; 
    ConsumerIterator<String, Signal> it; 

    public Consumer(String topic) { 
     consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); 

     this.topic = topic; 
     topicCountMap.put(topic, new Integer(1)); 
     consumerMap = consumer.createMessageStreams(topicCountMap, kd, new Serializer(
       new VerifiableProperties())); 
     stream = consumerMap.get(topic).get(0); 
     it = stream.iterator(); 

    } 

    private static ConsumerConfig createConsumerConfig() { 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", KafkaProperties.zkConnect); 
     props.put("group.id", KafkaProperties.groupId); 
     props.put("zookeeper.session.timeout.ms", "400"); 
     props.put("zookeeper.sync.time.ms", "200"); 
     props.put("auto.commit.interval.ms", "1000"); 
     props.put("fetch.size", "1024"); 

     return new ConsumerConfig(props); 

    } 

    synchronized public void run() { 

     while (it.hasNext()) { 
      t = (it.next().message()).getChannelid(); 
      System.out.println("In Consumer received msg" + t); 
     } 
    } 
} 

producer.java

public class Producer { 
    public final kafka.javaapi.producer.Producer<String, Signal> producer; 
    private final String topic; 
    private final Properties props = new Properties(); 

    public Producer(String topic) 
    { 
     props.put("serializer.class", "org.bigdata.kafka.Serializer"); 
     props.put("key.serializer.class", "kafka.serializer.StringEncoder"); 
     props.put("metadata.broker.list", "localhost:9092"); 
     // Use random partitioner. Don't need the key type. Just set it to Integer. 
     // The message is of type userdefined Object . 
     producer = new kafka.javaapi.producer.Producer<String,Signal(newProducerConfig(props)); 
     this.topic = topic; 
    } 
} 

KafkaProperties.java

public interface KafkaProperties { 
    final static String zkConnect = "127.0.0.1:2181"; 
    final static String groupId = "group1"; 
    final static String topic = "test00"; 
    final static String kafkaServerURL = "localhost"; 
    final static int kafkaServerPort = 9092; 
    final static int kafkaProducerBufferSize = 64 * 1024; 
    final static int connectionTimeOut = 100000; 
    final static int reconnectInterval = 10000; 
    final static String clientId = "SimpleConsumerDemoClient"; 
} 

Это то, как потребитель ведет себя в течение первых 10 сообщений, которые он не отправляет этому сообщению, полученному потребителем, но с 11-го сообщения он начинает работать правильно.

 producer sending msg1 

    producer sending msg2 

    producer sending msg3 

    producer sending msg4 

    producer sending msg5 

    producer sending msg6 

    producer sending msg7 

    producer sending msg8 

    producer sending msg9 

    producer sending msg10 

    producer sending msg11 

    producer sending msg12 
    In Consumer received msg12 

    producer sending msg13 
    In Consumer received msg13 

    producer sending msg14 
    In Consumer received msg14 

    producer sending msg15 
    In Consumer received msg15 

    producer sending msg16 
    In Consumer received msg16 

    producer sending msg17 
    In Consumer received msg17 

    producer sending msg18 
    In Consumer received msg18 

    producer sending msg19 
    In Consumer received msg19 

    producer sending msg20 
    In Consumer received msg20 

    producer sending msg21 
    In Consumer received msg21 

Редакция: добавление функции слушателя, где производитель отправляют сообщения потребителя. И я использую по умолчанию производителем конфигурационном не перезаписать его

public synchronized void onValueChanged(final MonitorEvent event_) { 


    // Get the value from the DBR 
    try { 
     final DBR dbr = event_.getDBR(); 

     final String[] val = (String[]) dbr.getValue(); 

     producer1.producer.send(new KeyedMessage<String, Signal>   
        (KafkaProperties.topic,new Signal(messageNo))); 
     System.out.println("producer sending msg"+messageNo); 

     messageNo++; 


    } catch (Exception ex) { 
     ex.printStackTrace(); 
    } 
} 
+0

Можете ли вы показать свой производитель и потребительский код/​​конфигурацию? Похоже, что некоторые из них работают с партиями (что хорошо, на самом деле). – Dmitry

+0

@Dmitry добавил фрагмент кода. – Ankita

+0

Потребитель, кажется, в порядке (кроме свойства fetch.size = 1K - это означает, что потребитель не может получать большие сообщения, но, вероятно, это не проблема, которую мы ищем). Можете ли вы поделиться кодами методов newProducerConfig() и run() для производителя? – Dmitry

ответ

8
  1. Попробуйте добавить props.put("request.required.acks", "1") конфигурации производителя. По умолчанию производитель не дожидается, пока не будет гарантирована доставка и доставка сообщений. Итак, если вы начинаете брокер непосредственно перед тестированием, продюсер может начать отправлять сообщения до того, как брокер будет полностью инициализирован, и первые несколько сообщений могут быть потеряны.

  2. Попробуйте добавить props.put("auto.offset.reset", "smallest") в конфигурацию потребителя. Он равен --from-beginning вариант kafka-console-consumer.sh. Если ваш потребитель запускается позже производителя, и нет данных о смещении, сохраненных в Zookeeper, то по умолчанию он начнет потреблять только новые сообщения (см. Consumer configs в документах).

+0

Благодарим за предложение. Добавьте procs.put («request.required.acks», «1») производителю, но программа ведет себя случайным образом. Я запускал программу 5 раз каждый раз с новой темой. Но он дал разные результаты все 5 раз. Попросите продюсера и потребителя синхронизировать время, когда потребитель был задержан. – Ankita

+0

Под «задержкой» вы подразумеваете, что все сообщения были получены, но не сразу после отправки? В исходном выпуске первые несколько сообщений были полностью потеряны. – Dmitry

+0

Да, на самом деле есть два сценария: 1) Иногда все сообщения были получены, но не сразу после отправки. 2) В других случаях несколько сообщений были потеряны, как показано на представленном выходе. Но когда я запускаю эту команду «bin/kafka-console-consumer.sh --zookeeper localhost: 2181 --topic topicname - from-begin» из консоли, я получаю такое же количество сообщений в потребителе, как и производитель. Почему это происходит ??? – Ankita

0

Это может быть связано с большим количеством перегородок, чем нет у потребителей. Убедитесь, что тема создана только с одним разделом, и вы не пропустите ни одного сообщения у потребителя.

Смежные вопросы