Использование Spring-Integration-Кафку, с исходящим-канальным адаптером Я пытаюсь отправить сообщение в теме с названием «тест»весна-интеграция-Кафка исходящий канал-адаптер Отправить сообщение
Через команду линия терминал, я начал, Кафка зоопарка и создал тему с именем "тест" конфигурации XML
Спринг
<int:publish-subscribe-channel id="inputToKafka" />
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
auto-startup="false"
channel="inputToKafka"
kafka-template="template"
sync="true"
topic="test">
</int-kafka:outbound-channel-adapter>
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
JUnit Test Code
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {
"classpath:kafka-outbound-context.xml"
})
public class ProducerTest{
@Autowired
@Qualifier("inputToKafka")
MessageChannel channel;
@Test
public void test_send_message() {
channel.send(MessageBuilder.withPayload("Test Message")
.setHeader(KafkaHeaders.TOPIC, "test").build());
}
}
Тестовый пример успешного и отладки я найти channel.send() возвращает истину
Я инспектировать тему через командную строку с командой ниже, но я не вижу никаких сообщений в тестовой теме.
бен/kafka-console-consumer.sh --bootstrap-сервер локальный: 9092 --topic тест --from-начало
Может кто-нибудь, почему я не вижу никаких сообщений о мой тест тема?
Спасибо @Gary Russell. Он работал после добавления Key and Value Serializer. –
Gary, для сложных объектов Java, какой должен быть класс Serializer? Будет ли ByteSerializer делать? –
Также есть документация, которая описывает обязательные свойства, которые должны быть установлены как bootstrap.servers, key.serializer, value.serializer и т. Д. Точно так же для потребителей. –