У меня есть следующие конфигурации XML для адаптера исходящего канала Кафки:Java DSL Эквивалент Spring Integration Кафки Endpoint сконфигурированных в XML
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-producer-context-ref="kafkaProducerContext"
auto-startup="true"
channel="activityOutputChannel">
<int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>
<task:executor id="taskExecutor"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
Это работает просто отлично. Я пытаюсь воспроизвести это в Java DSL, но я не могу зайти слишком далеко. До сих пор, я просто так:
.handle(Kafka.outboundChannelAdapter(kafkaConfig)
.addProducer(producerMetadata, brokerAddress)
.get());
Я не могу понять, как добавить taskExecutor
и poller
с DSL.
Любое понимание того, как включить их в мой общий IntegrationFlow
, оценено.