2016-12-31 3 views
1

Я хочу реализовать конечную точку HTTP, используя Spring Integration, которая прослушивает HTTP-запросы, отправляет данные запроса в виде сообщений на канал, а другая конечная точка должна прослушивать сообщения на этом канале и обрабатывать их.Весна Интеграция прослушивание в очереди без poller

Звуки простые. Но чего я хочу достичь:

  1. Сообщения должны обрабатываться в порядке.
  2. Сообщения должны обрабатываться как можно скорее (без задержки после HTTP-запроса, если очередь уже пуста).
  3. HTTP-запрос должен быть отправлен сразу после получения сообщения, а не после его обработки, поэтому отправитель будет знать только то, что сообщение получено для обработки.
  4. Я не хочу использовать внешние очереди, такие как RabbitMQ.

Для этого мне нужен QueueChannel. Но если я правильно понимаю, единственный способ получить сообщения из очереди - poller. Таким образом, точка 2 не будет удовлетворена. После получения сообщения будет небольшая задержка, и до того, как он увидит ее.

Итак, вопрос: есть ли какой-либо простой способ достичь этого в весенней интеграции, чего я не вижу?

Конечно, я могу реализовать его сам. Например, создавая компонент SmartLifeCycle, который прослушивает DirectChannel и просто помещает сообщения в java.util.concurrent.BlockingQueue, а также запускает выделенный поток, который будет ждать в этой очереди и отправить сообщение в другой DirectChannel для обработки. Поэтому не будет никакой задержки, потому что поток будет разблокирован, как только BlockingQueue не будет пустым.

Все это звучит как «узор» - некоторая очередь между двумя прямыми каналами на основе выделенного потока.

Возможно, есть более простой способ, уже реализованный в Spring Integration, который я просто не вижу из-за отсутствия опыта в этой области?

ответ

1

Точка 2 может быть удовлетворена даже с помощью опроса - просто установите fixed-delay на 0 и/или увеличьте время приема (по умолчанию 1 секунду); поток poller будет блокироваться в очереди до тех пор, пока сообщение не поступит; затем сразу же подождите.

Вы также можете использовать канал исполнителя (HTTP-поток отсылается к потоку исполнителя).

+0

Спасибо, Гэри! Я не понимал, что 'receiveTimeout' на самом деле является временем, что поток poller будет ждать новых сообщений при поддержке« BlockingQueue »до тех пор, пока не отпустит поток Scheduler. И после этого он снова займется потоком в соответствии с «Триггером» опроллера (например, 'delay'). Таким образом, с задержкой '0' и' receiveTimeout' из опроса '1s' будет блокировать поток Scheduler почти все время и обрабатывать сообщения как можно быстрее! Большое вам спасибо! * P.S. Канал исполнителя с однопоточным исполнителем, использующим внутреннюю очередь исполнителя, также является жизнеспособным решением, о котором я не думал! – djxak

+0

Имейте в виду, что любой из этих параметров может привести к потере сообщений в случае сбоя сервера. Если это проблема для вашего приложения, потребуется использование некоторого постоянного хранилища (например, rabbitmq). –

+0

Да, я знаю это. Но сообщение может быть потеряно даже при использовании RabbitMq. Сетевое соединение между вызывающим абонентом и входящей http-конечной точкой, авария приложения после получения сообщения, но до того, как она была отправлена ​​в очередь RabbitMq, авария вызывающего абонента - просто чтобы назвать несколько.Таким образом, вместо использования внешней очереди (которая не будет гарантировать «по крайней мере один раз») в любом случае лучше запланировать периодическую выборку всех сообщений за последние Х минут из внешнего источника и продолжить их идемпотент. Какой-то «компенсационный выбор» для обработки, возможно, потерянных сообщений. – djxak

Смежные вопросы