2015-12-23 4 views
3

У меня есть следующие конфигурации XML для адаптера исходящего канала Кафки:Java DSL Эквивалент Spring Integration Кафки Endpoint сконфигурированных в XML

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" 
            kafka-producer-context-ref="kafkaProducerContext" 
            auto-startup="true" 
            channel="activityOutputChannel"> 
    <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/> 

</int-kafka:outbound-channel-adapter> 
<task:executor id="taskExecutor" 
       pool-size="5-25" 
       queue-capacity="20" 
       keep-alive="120"/> 

Это работает просто отлично. Я пытаюсь воспроизвести это в Java DSL, но я не могу зайти слишком далеко. До сих пор, я просто так:

.handle(Kafka.outboundChannelAdapter(kafkaConfig) 
     .addProducer(producerMetadata, brokerAddress) 
     .get()); 

Я не могу понять, как добавить taskExecutor и poller с DSL.

Любое понимание того, как включить их в мой общий IntegrationFlow, оценено.

ответ

2

весной интеграции компонентов (например, <int-kafka:outbound-channel-adapter>) состоят с двумя бобами: AbstractEndpoint принимать сообщения от input-channel и MessageHandler для обработки сообщения.

Итак, Kafka.outboundChannelAdapter() есть примерно MessageHandler. Любые другие конечные точки специфические свойства до второго Consumer<GenericEndpointSpec<H>> endpointConfigurer аргумента .handle() EIP-метода:

.handle(Kafka.outboundChannelAdapter(kafkaConfig) 
    .addProducer(producerMetadata, brokerAddress), 
      e -> e.id("kafkaOutboundChannelAdapter") 
       .poller(p -> p.fixedDelay(1000, TimeUnit.MILLISECONDS) 
             .receiveTimeout(0) 
             .taskExecutor(this.taskExecutor))); 

См Reference Manual для получения дополнительной информации.

Смежные вопросы