2016-09-09 6 views
0

Я пытаюсь добавить перехватчик для проверки сообщений, опубликованных продюсером на тему Кафки. Мне нужно сделать несколько проверок в дополнение к проверке схемы, которая выполняется по теме Кафки. Шаги, которые я последовал, следующие.Производитель Kafka Interceptor

  1. Я написал класс Java, расширяющий интерфейс ProducerInterceptor.
  2. Скомпилировал класс и создал файл jar, который помещается в папку, включенную в путь к классам.
  3. Добавлен intercetors.classes = имя_файла в production.properties внутри установки Kafka.

Но когда я публикую сообщение в теме, пользовательский класс перехватчика, который я написал, не вызывается. (Я тоже не получаю ошибок. Сообщения публикуются в теме отлично).

I бессмыслица называют https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors

Пожалуйста консультации по этому вопросу.

ответ

0

Имя свойства interceptor.classes, не intercetors.classes

+0

благодарит за ответ Крис. Да, он правильно упоминается как interceptor.classes в production.properties. Извините за опечатку. –

0

Этот вопрос довольно старый, поэтому я предполагаю, что вы нашли решение в то же время. Однако на всякий случай это помогает кому-то другому, я обнаружил, что мой класс ProducerInterceptor, который отправляет сообщения по разным темам на основе содержимого сообщения, не вызывается, если у моего потока уже не указан указанный вывод.

Моя первая попытка выглядела примерно так, потому что я думал, что мне не нужно указывать тему вывода. Это не работает:

val builder: KStreamBuilder = new KStreamBuilder 
val input = builder.stream("input-topic") 

val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor) 
stream.start() 

Но это делает:

val builder: KStreamBuilder = new KStreamBuilder 
val input = builder.stream("input-topic").through("dummy-output-topic") 

val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor) 
stream.start() 

Стоит отметить, что ничего не будет опубликовано на этой dummy-output-topic во втором примере, и что to вместо through также, как представляется, работают с использованием так же.

В моем случае, я был вызов map изменить записи перед использованием перехватчика направить их на разные темы, так что мой код на самом деле выглядит так:

val builder: KStreamBuilder = new KStreamBuilder 
val input = builder.stream("input-topic") 
    .map(new CustomKeyValueMapper) 
    .through("dummy-output-topic") 

val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor) 
stream.start() 

Я надеюсь, что эти примеры помогают, кто работает с ProducerInterceptor s, кто сделал ту же ошибку, что и я.