2017-02-02 3 views
0

Наш потоковый поток Flink публикует сообщения в Kafka. Механизм «повторения» KafkaProducer не срабатывает до тех пор, пока в его внутренний буфер не добавится сообщение.Fink: Потеря данных KafkaProducer

Если перед этим существует исключение, KafkaProducer выкинет это исключение и похоже, что Flink не справляется с этим. В этом случае будет потеря данных.

Похожие Flink код (FlinkKafkaProducerBase):

if (logFailuresOnly) { 
      callback = new Callback() { 
       @Override 
       public void onCompletion(RecordMetadata metadata, Exception e) { 
        if (e != null) { 
         LOG.error("Error while sending record to Kafka: " + e.getMessage(), e); 
        } 
        acknowledgeMessage(); 
       } 
      }; 
     } 
     else { 
      callback = new Callback() { 
       @Override 
       public void onCompletion(RecordMetadata metadata, Exception exception) { 
        if (exception != null && asyncException == null) { 
         asyncException = exception; 
        } 
        acknowledgeMessage(); 
       } 
      }; 
     } 

Вот сценарий-х мы определили, что приведет к потере данных:

  1. Все Кафка брокеры вниз.

    В этом случае перед добавлением сообщения в его буфер KafkaProducer пытается получить метаданные. Если KafkaProducer не может получить метаданные в настроенном тайм-ауте, он выдает исключение.

  2. -Память записи не доступен для записи (существующий ошибка в библиотеке KAFKA 0.9.0.1)

https://issues.apache.org/jira/browse/KAFKA-3594

В обоих вышеуказанных случаях, KafkaProducer не будет повторять, и Флинка будет игнорировать сообщения , сообщения даже не регистрируются. Исключение составляет, но не сообщения, которые не удались.

Возможные обходные пути (настройки Кафка):

  1. Очень высокое значение тайм-аута метаданных (metadata.fetch.timeout.ms)
  2. Очень высокое значение для буфера истечения срока действия (request.timeout.ms)

Мы по-прежнему изучаем возможные побочные эффекты изменения вышеуказанных настроек кафки.

Итак, наше понимание правильное? Или можно ли избежать этой потери данных, изменив некоторые настройки Flink?

Спасибо.

ответ

0

Вот что я думаю о ваших вопросах. Смотрите один из Кафки гарантирует первое:

Для темы с фактором репликации N, мы будем терпеть до сбоев сервера N-1 без потери каких-либо записей , совершенных в журнал.

Во-первых, он заботится о сообщениях или записях, зафиксированных в журнале. Любые записи, которые не были доставлены, не считаются совершенными. Во-вторых, если все ваши брокеры были опущены, будет некоторая потеря данных.

Настройки, приведенные ниже, что мы используем, чтобы предотвратить потерю данных на стороне производителя:

  • block.on.buffer.полный = истинное
  • ACKs = все
  • повторы = max_value
  • max.in.flight.requests.per.connection = 1
  • Использование KafkaProducer.send (запись, обратный вызов) вместо отправки (запись)
  • unclean.leader.election.enable = ложь
  • replication.factor> min.insync.replicas
  • min.insync.replicas> 1
+0

Привет Аметист IC. Спасибо за ваш ответ. Да, у нас есть все вышеперечисленные настройки. Таким образом, даже при всех брокерах мы не должны потерять данные. На самом деле это ошибка в Flink. Пожалуйста, проверьте: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html – Ninad

+0

разве это не так в случае "async «издатели в версии (<= 0,9), они не поймают всех сбоев в обратном вызове (например, в сети, истечении срока действия пакета), и у вас все еще есть некоторые потерянные сообщения? – kisna

+0

Эти настройки от нового производителя. – amethystic

Смежные вопросы