2016-11-21 3 views
0

Я пытаюсь записать данные из файла в тему кафки. Мой код выглядит так:Производитель Kafka пропускает сообщения

Properties properties = new Properties(); 
    properties.put("bootstrap.servers", <bootstrapServers>); 
    properties.put("key.serializer", StringSerializer.class.getCanonicalName()); 
    properties.put("value.serializer", StringSerializer.class.getCanonicalName()); 
    properties.put("retries",100); 
    properties.put("linger.ms",5); 
    properties.put("acks", "all"); 

    KafkaProducer<Object, String> producer = new KafkaProducer<>(properties); 

    try (BufferedReader bf = new BufferedReader(new InputStreamReader(new FileInputStream(filePath), "UTF-8"))) { 
     String line; 
     int count = 0; 
     while ((line = bf.readLine()) != null) { 
      count++; 
      producer.send(new ProducerRecord<>(topicName, line)); 
     } 
    producer.flush(); 
     Logger.log("Done producing data messages. Total no of records produced:" + count); 
    } catch (InterruptedException | ExecutionException | IOException e) { 
     Throwables.propagate(e); 
    } finally { 
     producer.close(); 
    } 

Размер данных превышает 1 млн. Записей.

Когда я проверяю смещение данных по брокерам, используя следующую команду, есть только половина сообщений (около 5,00,000) написаны на тему:

./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <broker_list> --time -1 --topic <topic_name> 

Вывод указанной команды:

topic_name:1:292954 
topic_name:0:296787 

Какие изменения я должен сделать в подходе, чтобы убедиться, что все они написаны по этой теме.

+0

Можете ли вы показать фактический вывод команды GetOffsetShell? – C4stor

+0

Добавлен вывод в вопрос. –

+0

Каково значение count в файле журнала приложений? Показывает ли он 1 м? – notionquest

ответ

0

Сообщение отправлено асинхронно. Вы можете проверять смещения перед обработкой всех сообщений.

+0

Задержка журнала составляет 24 часа. И я проверяю сообщения сразу после создания сообщения, которое занимает едва 4-5 минут. –

+0

вы полностью изменили ответ. –

+0

Да, я плохо думал об этом и понял, что удержание не будет проблемой, я забыл обновить перед изменением –

Смежные вопросы