2

Моей системы выглядит следующим образом:Дросселирование входящих AMQP сообщений

[Q1] --> Service1 --> [Q2] --> ...(processing)... --> ServiceN --> [Outbound queue] 

Очередь RabbitMQ 3.5.6. Я использую Spring Integration 4.2.1, Spring AMQP 1.5.1 и Spring Integration Java DSL 1.1.0.

Я хотел бы придушить потребление сообщений из очереди Q1 по Service1 в зависимости от того, сколько сообщений в настоящее время в обработке и не достигли Outbound queue - например. Я хочу иметь макс. 10 сообщений обрабатываются одновременно. Это потому, что часть обработки потребляет много ресурсов, и я не хочу перегружать систему.

Моя текущая конфигурация начальной части потока просто следующим образом:

IntegrationFlows 
    .from(Amqp.inboundAdapter(connectionFactory, "Q1")) 
    .handle(message -> service1.process(message.getPayload()) 
    .get(); 

Service1 и ServiceN могут общаться (это же JVM), так что я в состоянии реализовать механизм блокировки между ними так что блоки service1.process() перед выполнением выполнения, если достигнут предел сообщения «в обработке». Вот что - если я правильно понимаю - @Gary Russell предложил в this comment. Тем не менее, это приведет к тому, что сообщения будут получены от брокера и ненадолго там будут находиться в состоянии без права. Есть ли способ просто не получать сообщения из очереди?

answer of @Artem Bilan для использования SimpleMessageListenerContainer.stop()/.start() кажется довольно тяжеловесным взглядом на реализацию и все логики выключения/запуска, которые будут вызываться.

Также оба ответа - два года. Любые лучшие предложения?

ответ

1

Неиспользованный адаптер, управляемый сообщениями, нет. Брокер будет отправлять сообщения потребителю (согласно prefetch-count).

Я не уверен, почему вы возражаете против сообщения, сидящего в состоянии un-ack'd.

В качестве альтернативы можно использовать простой опрашиваемые <int:inbound-channel-adapter/>, с скажем, 10 нитей в Poller, а в POJO адаптер вызывает, использовать RabbitTemplate к сообщения, но это менее эффективно, чем адаптер управляемых сообщений.

+0

Спасибо, Гэри - это было не очень сильное возражение против сбора сообщений, а затем ожидания с их обработкой. Я думал, что это будет просто чище (с точки зрения дизайна) не потреблять их, если я знаю, что я не могу их обработать в данный момент. Но, в конце концов, с дросселем прямо в начале потока тоже хорошо. –