Я хочу реализовать конечную точку HTTP, используя Spring Integration, которая прослушивает HTTP-запросы, отправляет данные запроса в виде сообщений на канал, а другая конечная точка должна прослушивать сообщения на этом канале и обрабатывать их.Весна Интеграция прослушивание в очереди без poller
Звуки простые. Но чего я хочу достичь:
- Сообщения должны обрабатываться в порядке.
- Сообщения должны обрабатываться как можно скорее (без задержки после HTTP-запроса, если очередь уже пуста).
- HTTP-запрос должен быть отправлен сразу после получения сообщения, а не после его обработки, поэтому отправитель будет знать только то, что сообщение получено для обработки.
- Я не хочу использовать внешние очереди, такие как RabbitMQ.
Для этого мне нужен QueueChannel
. Но если я правильно понимаю, единственный способ получить сообщения из очереди - poller
. Таким образом, точка 2 не будет удовлетворена. После получения сообщения будет небольшая задержка, и до того, как он увидит ее.
Итак, вопрос: есть ли какой-либо простой способ достичь этого в весенней интеграции, чего я не вижу?
Конечно, я могу реализовать его сам. Например, создавая компонент SmartLifeCycle
, который прослушивает DirectChannel
и просто помещает сообщения в java.util.concurrent.BlockingQueue
, а также запускает выделенный поток, который будет ждать в этой очереди и отправить сообщение в другой DirectChannel
для обработки. Поэтому не будет никакой задержки, потому что поток будет разблокирован, как только BlockingQueue
не будет пустым.
Все это звучит как «узор» - некоторая очередь между двумя прямыми каналами на основе выделенного потока.
Возможно, есть более простой способ, уже реализованный в Spring Integration, который я просто не вижу из-за отсутствия опыта в этой области?
Спасибо, Гэри! Я не понимал, что 'receiveTimeout' на самом деле является временем, что поток poller будет ждать новых сообщений при поддержке« BlockingQueue »до тех пор, пока не отпустит поток Scheduler. И после этого он снова займется потоком в соответствии с «Триггером» опроллера (например, 'delay'). Таким образом, с задержкой '0' и' receiveTimeout' из опроса '1s' будет блокировать поток Scheduler почти все время и обрабатывать сообщения как можно быстрее! Большое вам спасибо! * P.S. Канал исполнителя с однопоточным исполнителем, использующим внутреннюю очередь исполнителя, также является жизнеспособным решением, о котором я не думал! – djxak
Имейте в виду, что любой из этих параметров может привести к потере сообщений в случае сбоя сервера. Если это проблема для вашего приложения, потребуется использование некоторого постоянного хранилища (например, rabbitmq). –
Да, я знаю это. Но сообщение может быть потеряно даже при использовании RabbitMq. Сетевое соединение между вызывающим абонентом и входящей http-конечной точкой, авария приложения после получения сообщения, но до того, как она была отправлена в очередь RabbitMq, авария вызывающего абонента - просто чтобы назвать несколько.Таким образом, вместо использования внешней очереди (которая не будет гарантировать «по крайней мере один раз») в любом случае лучше запланировать периодическую выборку всех сообщений за последние Х минут из внешнего источника и продолжить их идемпотент. Какой-то «компенсационный выбор» для обработки, возможно, потерянных сообщений. – djxak