3

Я тестирую Spring-AMQP с поддержкой Spring-Integration, я следующие настройки и тестирования:Spring AMQP Интеграция - Потребитель Руководство Квитирование

<rabbit:connection-factory id="connectionFactory" /> 
<rabbit:queue name="durableQ"/> 
<int:channel id="consumingChannel"> 
    <int:queue capacity="2"/> <!-- Message get Acked as-soon-as filled in Q --> 
</int:channel> 

<int-amqp:inbound-channel-adapter 
    channel="consumingChannel" 
    queue-names="durableQ" 
    connection-factory="connectionFactory" 
    concurrent-consumers="1" 
    acknowledge-mode="AUTO" 
    /> 


public static void main(String[] args) { 
System.out.println("Starting consumer with integration.."); 
    AbstractApplicationContext context = new ClassPathXmlApplicationContext(
    "classpath:META-INF/spring/integration/spring-integration-context-consumer.xml"); 

    PollableChannel consumingChannel = context.getBean("consumingChannel", 
                  PollableChannel.class);   
     int count = 0; 
     while (true) { 
      Message<?> msg = consumingChannel.receive(1000); 
      System.out.println((count++) + " \t -> " + msg); 

      try { //sleep to check number of messages in queue 
       Thread.sleep(50000); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

В этой конфигурации это было очевидно, что как только приходит сообщение на consumingChannel они Acked и, следовательно, удаляется из очереди. Я подтвердил это, поставив высокий sleep после receive и проверив queue-size. Дальнейшего контроля над этим нет.

Теперь, если я установил acknowledge-mode=MANUAL, нет способов сделать ручное ack через интеграцию с пружиной.

. Мне нужно обработать сообщение и после обработки сделать manual-ack, так что до ack сообщение остается на уровне durableQ.

Есть ли способ обработки MANUAL ack с spring-amqp-integration? Я хочу избежать прохождения ChannelAwareMessageListener до inbound-channel-adapter, так как я хочу иметь контроль над потребительским receive.

Update:

Он даже не представляется возможным при использовании собственного listener-container с inbound-channel-adapter:

// Below creates a default direct-channel (spring-integration channel) named "adapter", to receive poll this channel which is same as above 
<int-amqp:inbound-channel-adapter id="adapter" listener-container="amqpListenerContainer" /> 

<bean id="amqpListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"> 
    <property name="connectionFactory" ref="connectionFactory" /> 
    <property name="queueNames" value="durableQ" /> 
    <property name="acknowledgeMode" value="MANUAL" /> 

// messageListener not allowed when using with adapter, so no way of having own ChannelAwareMessageListener, so no channel exposed onMessage, hence no way to ack 
    <property name="messageListener" ref="listener"/> 
</bean> 
<bean id="listener" class="com.sd.springint.rmq.MsgListener"/> 

Над конфигурации бросает ошибку как messageListener свойство не допускается, см инлайн комментарий на тег. Таким образом, цель использования listner-container оказалась побежденной (для разоблачения channel через ChannelAwareMessageListener).

Мне spring-integration не может быть использован для manual-acknowledgement (я знаю, это трудная поговорка!), Может ли кто-нибудь помочь мне в проверке этого или есть ли какой-либо конкретный подход/конфигурация, требуемые для этого, которых я не хватает?

ответ

3

Проблема заключается в том, что вы используете асинхронную передачу обслуживания с использованием QueueChannel. Как правило, лучше контролировать параллелизм в контейнере (concurrent-consumers="2") и не выполнять асинхронные передачи обслуживания в вашем потоке (используйте DirectChannel s). Таким образом, AUTO ack будет работать нормально. Вместо получения от PollableChannel подписки new MessageHandler() на номер SubscribableChannel.

Update:

Вы обычно не должны иметь дело с сообщениями в приложении SI, но эквивалент теста с DirectChannel будет ...

SubscribableChannel channel = context.getBean("fromRabbit", SubscribableChannel.class); 

    channel.subscribe(new MessageHandler() { 

     @Override 
     public void handleMessage(Message<?> message) throws MessagingException { 
      System.out.println("Got " + message); 
     } 
    }); 
+0

Итак,' PollableChannel' не то, что мне нужно (и это логично), но вы предполагаете, что и 'DirectChannel', и' SubscribableChannel' могут соответствовать моему требованию , Я немного смущен здесь? И как/когда происходит «ack» в этих случаях? Тем не менее, я пытаюсь использовать эти подходы. – harsh

+0

'DirectChannel' - это реализация' SubscribableChannel' (точно так же, как 'QueueChannel' - это реализация' PollableChannel'. –

+0

Я должен был попробовать, прежде чем отправлять комментарий выше, мой плохой! отлично. Так что в основном для меня - 1) Режим 'MANUAL' не должен использоваться в' spring-amqp-integration' (в основном с 'int-amqp: inbound-channel-adapter') в качестве" не подвергаются И 2) используют свойство DirectChannel и 'concurrent-consumer'. Большое спасибо, ваша помощь очень ценится. – harsh

0

MANUAL Ack разрешен только через Channel.basicAck(). Таким образом, у вас должен быть доступ к Channel, на котором было получено ваше сообщение.

Попробуйте играть с advice-chain из <int-amqp:inbound-channel-adapter>:

  1. Реализация некоторых Advice в MethodBeforeAdvice
  2. advice-chain на контейнер применяется для ContainerDelegate#invokeListener
  3. Первый аргумент этого метода является точно Channel
  4. Предположим, вы можете разместить на MessageProperties.headers, что Channel в этом Advice
  5. И сконфигурируйте <int-amqp:inbound-channel-adapter> с mapped-request-headers на это Channel.
  6. И в конце попробуйте вызвать basicAck() на то, что заголовок Channel из сообщения интеграции Spring в любом месте вашего потока вниз по потоку.
+0

Спасибо Артем за 'adv-chain' pointer, я пытаюсь это сделать. – harsh

Смежные вопросы