2017-01-19 3 views
1

Я использую Kafka v0.10.0.0 и создал Продюсер & Потребительский Java-код. Но код застрял на производителю. Без каких-либо исключений в журналах.Kafka Producer Consumer API Issue

Может ли кто-нибудь помочь. Заранее спасибо.

Я использую/модифицирую программу «mapr - kakfa sample program». Здесь вы можете посмотреть полный код. https://github.com/panwars87/kafka-sample-programs

** Важно: я изменил версию kafka-client на 0.10.0.0 в зависимостях maven и запустил Kafka 0.10.0.0 в моем локальном.

public class Producer { 
public static void main(String[] args) throws IOException { 
    // set up the producer 
    KafkaProducer<String, String> producer; 
    System.out.println("Starting Producers...."); 
    try (InputStream props = Resources.getResource("producer.props").openStream()) { 
     Properties properties = new Properties(); 
     properties.load(props); 
     producer = new KafkaProducer<>(properties); 
     System.out.println("Property loaded successfully ...."); 
    } 

    try { 
     for (int i = 0; i < 20; i++) { 
      // send lots of messages 
      System.out.println("Sending record one by one...."); 
      producer.send(new ProducerRecord<String, String>("fast-messages","sending message - "+i+" to fast-message.")); 

      System.out.println(i+" message sent...."); 
      // every so often send to a different topic 
      if (i % 2 == 0) { 
       producer.send(new ProducerRecord<String, String>("fast-messages","sending message - "+i+" to fast-message.")); 
       producer.send(new ProducerRecord<String, String>("summary-markers","sending message - "+i+" to summary-markers.")); 
       producer.flush(); 
       System.out.println("Sent msg number " + i); 
      } 
     } 
    } catch (Throwable throwable) { 
     System.out.printf("%s", throwable.getStackTrace()); 
     throwable.printStackTrace(); 
    } finally { 
     producer.close(); 
    } 

    } 
} 

public class Consumer { 
public static void main(String[] args) throws IOException { 

    // and the consumer 
    KafkaConsumer<String, String> consumer; 
    try (InputStream props = Resources.getResource("consumer.props").openStream()) { 
     Properties properties = new Properties(); 
     properties.load(props); 
     if (properties.getProperty("group.id") == null) { 
      properties.setProperty("group.id", "group-" + new Random().nextInt(100000)); 
     } 
     consumer = new KafkaConsumer<>(properties); 
    } 
    consumer.subscribe(Arrays.asList("fast-messages", "summary-markers")); 
    int timeouts = 0; 
    //noinspection InfiniteLoopStatement 
    while (true) { 
     // read records with a short timeout. If we time out, we don't really care. 
     ConsumerRecords<String, String> records = consumer.poll(200); 
     if (records.count() == 0) { 
      timeouts++; 
     } else { 
      System.out.printf("Got %d records after %d timeouts\n", records.count(), timeouts); 
      timeouts = 0; 
     } 
     for (ConsumerRecord<String, String> record : records) { 
      switch (record.topic()) { 
       case "fast-messages": 
        System.out.println("Record value for fast-messages is :"+ record.value());    
         break; 
     case "summary-markers": 
      System.out.println("Record value for summary-markers is :"+ record.value()); 
         break; 
       default: 
        throw new IllegalStateException("Shouldn't be possible to get message on topic "); 
      } 
     } 
    } 
    } 
} 
+0

Там много там происходит - конфигурация загрузки, цикл, который отправляет несколько сообщений нескольких тема, флеш вызова и т.д. Вы можете уменьшить это к чему-то меньшему, который производит ошибка и/или дать более подробную информацию - где именно она «застревает»? Успешно ли первое отправление? Второй? Откуда вы знаете, что это застряло? Добавили ли вы регистрацию, чтобы увидеть, какие отправляет работу, а какие нет. –

+0

Вы упомянули, что производитель застрял, но вставил код для потребителя? – amethystic

+0

Я добавил и производителя, и потребителя. – PanwarS87

ответ

0

Код, в котором вы работаете, предназначен для демонстрации карты R, которая не является Кафкой. MapR утверждает API совместимость с Kafka 0.9, но даже тогда mapR обрабатывает смещения сообщений по-разному, что делает Kafka (смещения являются байтами смещения сообщений, а не инкрементными смещениями) и т. Д. Реализация mapR также очень и очень отличается, по меньшей мере. Это означает, что если вам повезет, приложение Kafka 0.9 может просто запускаться на mapR и наоборот. Таких гарантий для других выпусков нет.