Я тестирую Spring-AMQP
с поддержкой Spring-Integration
, я следующие настройки и тестирования:Spring AMQP Интеграция - Потребитель Руководство Квитирование
<rabbit:connection-factory id="connectionFactory" />
<rabbit:queue name="durableQ"/>
<int:channel id="consumingChannel">
<int:queue capacity="2"/> <!-- Message get Acked as-soon-as filled in Q -->
</int:channel>
<int-amqp:inbound-channel-adapter
channel="consumingChannel"
queue-names="durableQ"
connection-factory="connectionFactory"
concurrent-consumers="1"
acknowledge-mode="AUTO"
/>
public static void main(String[] args) {
System.out.println("Starting consumer with integration..");
AbstractApplicationContext context = new ClassPathXmlApplicationContext(
"classpath:META-INF/spring/integration/spring-integration-context-consumer.xml");
PollableChannel consumingChannel = context.getBean("consumingChannel",
PollableChannel.class);
int count = 0;
while (true) {
Message<?> msg = consumingChannel.receive(1000);
System.out.println((count++) + " \t -> " + msg);
try { //sleep to check number of messages in queue
Thread.sleep(50000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
В этой конфигурации это было очевидно, что как только приходит сообщение на consumingChannel
они Acked и, следовательно, удаляется из очереди. Я подтвердил это, поставив высокий sleep
после receive
и проверив queue-size
. Дальнейшего контроля над этим нет.
Теперь, если я установил acknowledge-mode=MANUAL
, нет способов сделать ручное ack
через интеграцию с пружиной.
. Мне нужно обработать сообщение и после обработки сделать manual-ack
, так что до ack
сообщение остается на уровне durableQ
.
Есть ли способ обработки MANUAL
ack с spring-amqp-integration
? Я хочу избежать прохождения ChannelAwareMessageListener
до inbound-channel-adapter
, так как я хочу иметь контроль над потребительским receive
.
Update:
Он даже не представляется возможным при использовании собственного listener-container
с inbound-channel-adapter
:
// Below creates a default direct-channel (spring-integration channel) named "adapter", to receive poll this channel which is same as above
<int-amqp:inbound-channel-adapter id="adapter" listener-container="amqpListenerContainer" />
<bean id="amqpListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="queueNames" value="durableQ" />
<property name="acknowledgeMode" value="MANUAL" />
// messageListener not allowed when using with adapter, so no way of having own ChannelAwareMessageListener, so no channel exposed onMessage, hence no way to ack
<property name="messageListener" ref="listener"/>
</bean>
<bean id="listener" class="com.sd.springint.rmq.MsgListener"/>
Над конфигурации бросает ошибку как messageListener
свойство не допускается, см инлайн комментарий на тег. Таким образом, цель использования listner-container
оказалась побежденной (для разоблачения channel
через ChannelAwareMessageListener
).
Мне spring-integration
не может быть использован для manual-acknowledgement
(я знаю, это трудная поговорка!), Может ли кто-нибудь помочь мне в проверке этого или есть ли какой-либо конкретный подход/конфигурация, требуемые для этого, которых я не хватает?
Итак,' PollableChannel' не то, что мне нужно (и это логично), но вы предполагаете, что и 'DirectChannel', и' SubscribableChannel' могут соответствовать моему требованию , Я немного смущен здесь? И как/когда происходит «ack» в этих случаях? Тем не менее, я пытаюсь использовать эти подходы. – harsh
'DirectChannel' - это реализация' SubscribableChannel' (точно так же, как 'QueueChannel' - это реализация' PollableChannel'. –
Я должен был попробовать, прежде чем отправлять комментарий выше, мой плохой! отлично. Так что в основном для меня - 1) Режим 'MANUAL' не должен использоваться в' spring-amqp-integration' (в основном с 'int-amqp: inbound-channel-adapter') в качестве" не подвергаются И 2) используют свойство DirectChannel и 'concurrent-consumer'. Большое спасибо, ваша помощь очень ценится. – harsh