2015-04-09 2 views
1

У меня есть сценарий в моей настройке RabbitMQ, что мне интересно, как решить. Ниже схема иллюстрирует его (обмены и большинство очередей удалены для краткости):Message Ordering Over Queues

enter image description here

Сценарий

  1. Производитель создает сообщение А (1), его получения верхнего потребителя, который начинает обработку сообщения.
  2. Производитель создает сообщение A (2), оно принимается нижним потребителем (при условии, что оба потребителя находятся на бирже с циклическим переключением).
  3. Нижний потребитель публикует сообщение B (2), которое помещается в очередь потребителя B
  4. Бедный медленный потребитель верхнего уровня окончательно заканчивает и испускает свое сообщение B (1).

Проблема

Если мы предположим, что B потребитель не может быть идемпотент, как мы обеспечиваем результат обоих сообщений B применяются в правильном порядке?

Я подумал о том, чтобы использовать временную метку, применяемую к первоначальной публикации сообщения A, и чтобы потребитель сохранял временную метку последнего изменения, отклоняя любые временные метки до этого времени, но это работает только в том случае, если каждое сообщение вызывает точный такого же изменения и требует много отслеживания.

Другие идеи относительно того, как подойти к этому, будут оценены. Благодаря!

+0

Ваш вопрос, кажется, связанные с [RabbitMQ засаде несколько очередей, чтобы закончить] (http://stackoverflow.com/questions/13861459/rabbitmq-wait-for-multiple-queues-to-finish) –

ответ

2

Я не уверен, что конкретно для RabbitMQ здесь, но идея с отметками времени звучит неплохо, если у вас есть производитель.

Производитель придает метку времени к сообщениям А, каждое сообщение B взять ту же метку времени своего соответствующего сообщения А.

С вашим подходом некоторые сообщения не будут обработаны, например, сообщение B (1). Если все сообщения должны быть обработаны потребителя B, но они должны быть обработаны в неопределенном порядке, то вы можете сделать детерминированный слияние:

Потребитель B оснащен двумя очередями, одна очередь для каждого потребителя A Потребитель B всегда проверяет верхнюю часть обеих очередей:

  • Если обе очереди не являются пустыми, потребитель B выдает сообщение с наименьшей меткой времени.
  • если хотя бы одна очередь пуста, потребитель B ждет.

При таком подходе порядок, в котором потребитель B обрабатывает сообщения, задается метками времени производителя, и ни одно сообщение не отбрасывается.Предположения:

  • очередь FIFO
  • ни один процесс не выходит из строя
  • всегда так, что в конечном счете каждый потребитель обрабатывает сообщение
  • потребителя B может проверить верхнюю часть очереди в неблокирующих модах
+0

Спасибо за это , очень интересно идея. К сожалению, это было бы очень сложно реализовать в нашей конкретной ситуации, поскольку мы не можем гарантировать предположение 3 (в конечном итоге каждый потребитель A создает сообщение) - приведенная выше модель была слегка упрощена, а число потребителей As могло динамически масштабироваться, поэтому очереди потребителей B должны были бы масштабироваться в тандеме с этим. –

+0

Это все усложняет. Некоторые предложения: (1) Если набор потребителей A исправлен, но они иногда молчат (не отправлять сообщения), вы можете решить это, периодически отбиваясь от производителя и заставляя потребителей переходить к управляющему сообщению только для заполнения его очередь в потребителе B. – danyhow

+0

(2) Если набор потребителей A изменяется, подход к этому ответу потребует от вас постоянного согласования членства, например, в самом продюсере или в анкете ZooKeeper. Потребителю B необходимо будет сообщить, до какой отметки, какая группа потребителей A была активной. И снова пульс от производителя до потребителя B может содержать информацию о членстве и времени. (3) Что касается очередей масштабирования в B: B, все равно может быть одна очередь ввода и просто сохранить набор внутренних буферов для сообщений, удаленных из очереди ввода, но которые еще не могут быть обработаны. – danyhow