2016-12-13 4 views
0

Использование Spring-Integration-Кафку, с исходящим-канальным адаптером Я пытаюсь отправить сообщение в теме с названием «тест»весна-интеграция-Кафка исходящий канал-адаптер Отправить сообщение

Через команду линия терминал, я начал, Кафка зоопарка и создал тему с именем "тест" конфигурации XML

Спринг

<int:publish-subscribe-channel id="inputToKafka" /> 

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" 
            auto-startup="false" 
            channel="inputToKafka" 
            kafka-template="template" 
            sync="true" 
            topic="test"> 
</int-kafka:outbound-channel-adapter> 

<bean id="template" class="org.springframework.kafka.core.KafkaTemplate"> 
    <constructor-arg> 
     <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> 
      <constructor-arg> 
       <map> 
        <entry key="bootstrap.servers" value="localhost:9092" /> 
       </map> 
      </constructor-arg> 
     </bean> 
    </constructor-arg> 
</bean> 

JUnit Test Code

@RunWith(SpringJUnit4ClassRunner.class) 
@ContextConfiguration(locations = { 
     "classpath:kafka-outbound-context.xml" 
     }) 
public class ProducerTest{ 

    @Autowired 
    @Qualifier("inputToKafka") 
    MessageChannel channel; 

    @Test 
    public void test_send_message() { 

     channel.send(MessageBuilder.withPayload("Test Message") 
       .setHeader(KafkaHeaders.TOPIC, "test").build()); 

    } 

} 

Тестовый пример успешного и отладки я найти channel.send() возвращает истину

Я инспектировать тему через командную строку с командой ниже, но я не вижу никаких сообщений в тестовой теме.

бен/kafka-console-consumer.sh --bootstrap-сервер локальный: 9092 --topic тест --from-начало

Может кто-нибудь, почему я не вижу никаких сообщений о мой тест тема?

ответ

1

Вы заглянули в журналы? Вам нужно настроить ключ и значение сериализаторы, в противном случае вы получите

Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value. 

При использовании Java:

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 

Ключи карта key.serializer и value.serializer.

+0

Спасибо @Gary Russell. Он работал после добавления Key and Value Serializer. –

+0

Gary, для сложных объектов Java, какой должен быть класс Serializer? Будет ли ByteSerializer делать? –

+0

Также есть документация, которая описывает обязательные свойства, которые должны быть установлены как bootstrap.servers, key.serializer, value.serializer и т. Д. Точно так же для потребителей. –