Наш потоковый поток Flink публикует сообщения в Kafka. Механизм «повторения» KafkaProducer не срабатывает до тех пор, пока в его внутренний буфер не добавится сообщение.Fink: Потеря данных KafkaProducer
Если перед этим существует исключение, KafkaProducer выкинет это исключение и похоже, что Flink не справляется с этим. В этом случае будет потеря данных.
Похожие Flink код (FlinkKafkaProducerBase):
if (logFailuresOnly) {
callback = new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
}
acknowledgeMessage();
}
};
}
else {
callback = new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null && asyncException == null) {
asyncException = exception;
}
acknowledgeMessage();
}
};
}
Вот сценарий-х мы определили, что приведет к потере данных:
Все Кафка брокеры вниз.
В этом случае перед добавлением сообщения в его буфер KafkaProducer пытается получить метаданные. Если KafkaProducer не может получить метаданные в настроенном тайм-ауте, он выдает исключение.
-Память записи не доступен для записи (существующий ошибка в библиотеке KAFKA 0.9.0.1)
https://issues.apache.org/jira/browse/KAFKA-3594
В обоих вышеуказанных случаях, KafkaProducer не будет повторять, и Флинка будет игнорировать сообщения , сообщения даже не регистрируются. Исключение составляет, но не сообщения, которые не удались.
Возможные обходные пути (настройки Кафка):
- Очень высокое значение тайм-аута метаданных (metadata.fetch.timeout.ms)
- Очень высокое значение для буфера истечения срока действия (request.timeout.ms)
Мы по-прежнему изучаем возможные побочные эффекты изменения вышеуказанных настроек кафки.
Итак, наше понимание правильное? Или можно ли избежать этой потери данных, изменив некоторые настройки Flink?
Спасибо.
Привет Аметист IC. Спасибо за ваш ответ. Да, у нас есть все вышеперечисленные настройки. Таким образом, даже при всех брокерах мы не должны потерять данные. На самом деле это ошибка в Flink. Пожалуйста, проверьте: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html – Ninad
разве это не так в случае "async «издатели в версии (<= 0,9), они не поймают всех сбоев в обратном вызове (например, в сети, истечении срока действия пакета), и у вас все еще есть некоторые потерянные сообщения? – kisna
Эти настройки от нового производителя. – amethystic