Я пытаюсь определить простой поток сообщений в Spring Integration, который читает с одного канала, а затем отправляет сообщения в очередь Kafka. Для этого я использую spring-integration-kafka. Проблема в том, что я получаю ошибку EvaluationContext
, которую я не могу расшифровать.EvaluationContext Null With Spring Integration и Kafka
Вот моя конфигурация в XML:
<int:channel id="myStreamChannel"/>
<int:gateway id="myGateway" service-interface="com.myApp.MyGateway" >
<int:method name="process" request-channel="myStreamChannel"/>
</int:gateway>
<int:channel id="activityOutputChannel"/>
<int:transformer input-channel="myStreamChannel" output-channel="activityOutputChannel" ref="activityTransformer"/>
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-producer-context-ref="kafkaProducerContext"
auto-startup="false"
channel="activityOutputChannel"
topic="my-test"
message-key-expression="header.messageKey">
<int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>
<task:executor id="taskExecutor"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
<int-kafka:producer-context id="kafkaProducerContext" producer-properties="producerProperties">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration broker-list="kafkaserver.com:9092"
key-class-type="java.lang.String"
value-class-type="java.lang.String"
topic="my-test"
key-encoder="stringEncoder"
value-encoder="stringEncoder"
compression-codec="snappy"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
Когда я запускаю мое приложение с помощью Spring ботинка, я получаю это исключение:
Ошибка создания боба с именем «org.springframework.integration.kafka .outbound.KafkaProducerMessageHandler # 0 ': вызов метода init не удался; inested exception is java.lang.IllegalArgumentException: [Assertion failed] - этот аргумент требуется; он не должен быть пустым
Это ошибочная строка в трассировке стеки:
в org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.onInit (KafkaProducerMessageHandler.java:68)
Вот что происходит на линии 68:
Assert.notNull(this.evaluationContext);
Так EvaluationContext
равна нулю. Понятия не имею почему.
Кстати, когда я заменяю конечную точку Kafka тривиальной конечной точкой stdout
для печати тела сообщения, все работает нормально.
Можете ли вы рассказать мне, что не так с моей конфигурацией конечной точки Kafka, что нет EvaluationContext
?