1

Я пытаюсь определить простой поток сообщений в Spring Integration, который читает с одного канала, а затем отправляет сообщения в очередь Kafka. Для этого я использую spring-integration-kafka. Проблема в том, что я получаю ошибку EvaluationContext, которую я не могу расшифровать.EvaluationContext Null With Spring Integration и Kafka

Вот моя конфигурация в XML:

<int:channel id="myStreamChannel"/> 
<int:gateway id="myGateway" service-interface="com.myApp.MyGateway" > 
    <int:method name="process" request-channel="myStreamChannel"/> 
</int:gateway> 
<int:channel id="activityOutputChannel"/> 
<int:transformer input-channel="myStreamChannel" output-channel="activityOutputChannel" ref="activityTransformer"/>  
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" 
            kafka-producer-context-ref="kafkaProducerContext" 
            auto-startup="false" 
            channel="activityOutputChannel" 
            topic="my-test" 
            message-key-expression="header.messageKey"> 
    <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/> 
</int-kafka:outbound-channel-adapter> 

<task:executor id="taskExecutor" 
       pool-size="5-25" 
       queue-capacity="20" 
       keep-alive="120"/> 

<int-kafka:producer-context id="kafkaProducerContext" producer-properties="producerProperties"> 
    <int-kafka:producer-configurations> 
     <int-kafka:producer-configuration broker-list="kafkaserver.com:9092" 
              key-class-type="java.lang.String" 
              value-class-type="java.lang.String" 
              topic="my-test" 
              key-encoder="stringEncoder" 
              value-encoder="stringEncoder" 
              compression-codec="snappy"/> 
    </int-kafka:producer-configurations> 
</int-kafka:producer-context> 

Когда я запускаю мое приложение с помощью Spring ботинка, я получаю это исключение:

Ошибка создания боба с именем «org.springframework.integration.kafka .outbound.KafkaProducerMessageHandler # 0 ': вызов метода init не удался; inested exception is java.lang.IllegalArgumentException: [Assertion failed] - этот аргумент требуется; он не должен быть пустым

Это ошибочная строка в трассировке стеки:

в org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.onInit (KafkaProducerMessageHandler.java:68)

Вот что происходит на линии 68:

Assert.notNull(this.evaluationContext);

Так EvaluationContext равна нулю. Понятия не имею почему.

Кстати, когда я заменяю конечную точку Kafka тривиальной конечной точкой stdout для печати тела сообщения, все работает нормально.

Можете ли вы рассказать мне, что не так с моей конфигурацией конечной точки Kafka, что нет EvaluationContext?

ответ

3

Ваш вопрос относится к версии несоответствия ад. Весенняя ботинок вытаскивает версию Spring Integration Core, которая не поддерживает IntegrationEvaluationContextAware, чтобы заполнить EvaluationContext в KafkaProducerMessageHandler.

Итак, вы должны модернизировать интеграцию весны Kafka до последнего выпуска: https://github.com/spring-projects/spring-integration-kafka/releases

Смежные вопросы