2015-08-19 2 views
4

Я использую Kafka Spring Integration для публикации и потребления сообщений с использованием kafka. Я вижу, что Payload правильно передается от производителя к потребителю, но информация заголовка становится где-то переопределенной.Kafka Spring Integration: заголовки не подходят для потребителя kafka

@ServiceActivator(inputChannel = "fromKafka") 
public void processMessage(Message<?> message) throws InterruptedException, 
     ExecutionException { 
    try { 
      System.out.println("Headers :" + message.getHeaders().toString()); 
     } 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 

я получаю следующие заголовки:

Headers :{timestamp=1440013920609, id=f8c645f7-677b-ec32-dad0-a7b79082ef81} 

Я Построив сообщение в конце производителя, как это:

Message<FeelDBMessage> message = MessageBuilder 
       .withPayload(samplePayloadObj) 
       .setHeader(KafkaHeaders.MESSAGE_KEY, "key") 
       .setHeader(KafkaHeaders.TOPIC, "sampleTopic").build(); 

     // publish the message 
     publisher.publishMessage(message); 

и ниже является информация заголовка на производителя:

headers={timestamp=1440013914085, id=c4159c1c-2c67-634b-ef8d-3fb026b1172e, kafka_messageKey=key, kafka_topic=sampleTopic} 

Любая идея, почему Heade rs переопределены другим значением?

ответ

5

Просто потому, что по умолчанию Framework использует неизменяемыйGenericMessage.

Любая манипуляция с существующим сообщением (например, MessageBuilder.withPayload) приведет к созданию нового экземпляра GenericMessage.

С другой стороны, Kafka не поддерживает абстракции как JMS или AMQP. headers. Вот почему KafkaProducerMessageHandler просто сделать это, когда он публикует сообщение Кафке:

this.kafkaProducerContext.send(topic, partitionId, messageKey, message.getPayload()); 

Как вы видите, он не посылает headers вообще. Таким образом, другая сторона (потребитель) имеет дело только с message с темой как payload и некоторые системные параметры, такие как headers как topic, partition, messageKey.

В двух словах: мы не передаем заголовки над Кафкой, потому что они не поддерживают их.

+0

Спасибо Артем за ваш ответ. Но в документе kafka-spring-integration говорится следующее: Целевая тема и раздел для публикации сообщения могут быть настроены через заголовки kafka_topic и kafka_partitionId соответственно. Существует фрагмент кода тоже, который устанавливает заголовок при отправке сообщения: channel.send ( MessageBuilder.withPayload (полезная нагрузка) .setHeader (KafkaHeaders.MESSAGE_KEY, "ключ") .setHeader (KafkaHeaders.TOPIC, «тест») .build() ); – zer0Id0l

+0

Он также говорит, что: «Важно Интерфейс KafkaHeaders содержит константы, используемые для взаимодействия с заголовками Заголовки messageKey и тема по умолчанию в настоящее время требуют kafka_ префикс.» Если заголовки не передается, что является использование класса KafkaHeaders? – zer0Id0l

+0

Используется для внутренней интеграции Spring. Например, при отправке части для динамического разрешения тем. Пожалуйста, не смешивайте Spring Integration Kafka Support и Apache Kafka. Нечто похожее вы можете найти в модуле «Файл». –