2013-04-30 4 views
2

Я пытаюсь внедрить простую программу производителя -> Kafka -> Consumer на Java. Я могу произвести, а также успешно использовать сообщения, но проблема возникает, когда я перезапускаю пользователя, когда некоторые из уже потребляемых сообщений снова получают потребитель от Kafka (не все сообщения, но некоторые из последних потребляемые сообщения).Простая копия доставки сообщений Simple-Kafka-consumer

У меня есть autooffset.reset=largest у моего потребителя, а мое имущество autocommit.interval.ms настроено на 1000 миллисекунд.

Является ли эта «повторная доставка некоторых уже потребляемых сообщений» известной проблемой, или есть какие-либо другие настройки, которые мне не хватает здесь?

В принципе, существует ли способ обеспечить, чтобы ни один из ранее потребляемых сообщений не получал/потреблял потребитель?

ответ

3

Kafka использует Zookeeper для хранения потребительских смещений. Поскольку операции Zookeeper довольно медленные, не рекомендуется фиксировать смещение после потребления каждого сообщения.

К загрузке можно добавить крюк остановки, который будет вручную фиксировать смещение темы перед выходом. Однако это не поможет в определенных ситуациях (например, сбой jvm или kill -9). Чтобы снова защищать эти ситуации, я бы посоветовал реализовать пользовательскую логику фиксации, которая будет фиксировать смещение локально после обработки каждого сообщения (файла или локальной базы данных), а также фиксировать смещение Zookeeper каждые 1000 мс. При запуске потребителей оба этих местоположения должны быть запрошены, и в качестве смещения потребления следует использовать максимум два значения.

+0

это может показаться глупым, но если мы скажем, например, реализуем логику пользовательской фиксации, тогда можно управлять смещением для каждого сообщения. Например, если у меня есть два сообщения со значением timestamp в нем, я хотел бы установить смещение на основе метки времени. Поэтому, если вторая запись имеет более раннюю временную метку, тогда назначенное ей смещение должно быть меньше, чем другое. Поэтому во время потребления я получаю сообщения, которые уже отсортированы. – user2720864