Моей системы выглядит следующим образом:Дросселирование входящих AMQP сообщений
[Q1] --> Service1 --> [Q2] --> ...(processing)... --> ServiceN --> [Outbound queue]
Очередь RabbitMQ 3.5.6. Я использую Spring Integration 4.2.1, Spring AMQP 1.5.1 и Spring Integration Java DSL 1.1.0.
Я хотел бы придушить потребление сообщений из очереди Q1
по Service1
в зависимости от того, сколько сообщений в настоящее время в обработке и не достигли Outbound queue
- например. Я хочу иметь макс. 10 сообщений обрабатываются одновременно. Это потому, что часть обработки потребляет много ресурсов, и я не хочу перегружать систему.
Моя текущая конфигурация начальной части потока просто следующим образом:
IntegrationFlows
.from(Amqp.inboundAdapter(connectionFactory, "Q1"))
.handle(message -> service1.process(message.getPayload())
.get();
Service1
и ServiceN
могут общаться (это же JVM), так что я в состоянии реализовать механизм блокировки между ними так что блоки service1.process()
перед выполнением выполнения, если достигнут предел сообщения «в обработке». Вот что - если я правильно понимаю - @Gary Russell предложил в this comment. Тем не менее, это приведет к тому, что сообщения будут получены от брокера и ненадолго там будут находиться в состоянии без права. Есть ли способ просто не получать сообщения из очереди?
answer of @Artem Bilan для использования SimpleMessageListenerContainer.stop()/.start()
кажется довольно тяжеловесным взглядом на реализацию и все логики выключения/запуска, которые будут вызываться.
Также оба ответа - два года. Любые лучшие предложения?
Спасибо, Гэри - это было не очень сильное возражение против сбора сообщений, а затем ожидания с их обработкой. Я думал, что это будет просто чище (с точки зрения дизайна) не потреблять их, если я знаю, что я не могу их обработать в данный момент. Но, в конце концов, с дросселем прямо в начале потока тоже хорошо. –