2015-06-22 2 views
4

У меня есть поток объектов deltas (то есть объекты JSON, описывающие изменения для других объектов), поступающие из очереди сообщений сторонних разработчиков. Мне нужно применить их к соответствующим объектам в базе данных (перевод дельт в состояние). Дельты по своей сути упорядочены.Многопотоковое потребление упорядоченной очереди

Как бы то ни было, я намерен передать эти дельта в наш собственный кластер RabbitMQ, откуда группа серверов Java вытащит их, а затем применит их к базе данных (Java - это где логика обновления базы данных централизована).

Приложение дельт должно быть многопоточным, но я хочу, чтобы дельта для данного объекта всегда применялась в порядке. Чтобы действительно гарантировать это, только один поток может обрабатывать дельта для данного объекта.

С этой целью, когда я прочитал их с третьей очереди и перед тем, как поместить их в RabbitMQ, я решил, что могу разделить дельта на очереди на uuid соответствующего объекта. В принципе, каждая дельта имеет поле object_uuid, и я бы по модулю, что uuid, скажем, 50, а затем использовать результат в качестве ключа маршрутизации, чтобы у меня было 50 очередей упорядоченных дельт в RabbitMQ.

В этот момент это просто (хе) вопрос обеспечения того, чтобы у меня был один потребитель в очереди (хотя я все еще могу иметь несколько очередей на одного потребителя). Я думал, что «эксклюзивный» параметр для очереди объявлений в AMQP может дать мне желаемое поведение, и это похоже на то, что происходит, но, к сожалению, оно связано с непосильным побочным эффектом, что очередь удаляется при отключении потребителя (это парк Java серверы, которые появляются вверх и вниз с каждой версией - очереди должны сохраняться между релизами).

Это не может быть необычной дилеммой, но я не вижу ничего, что вполне подходит этой проблеме. Разве нет никакой конструкции в RabbitMQ или AMQP, которую я могу использовать в своих интересах здесь? Есть ли способ переосмыслить эту проблему, которая позволит избежать проблемы? Или мне нужно взглянуть на распределенные блокирующие решения?

+0

Что именно означает 'объект дельта'? Любой шанс вы могли бы привести пример того, что эти дельта и если они все применимы к одному объекту или применяются к нескольким объектам, каждый из которых имеет несколько дельт? – kha

+0

@kha, я обновил вопрос, чтобы уточнить. Каждая дельта имеет внешний ключ для объекта, к которому должна применяться дельта. – jwilner

+0

, так что проблема с урмом сводится к тому, что есть способ, которым rabbitmq может сохранять данные, если потребитель отключается правильно? – nafas

ответ

0

Это то, что я понимаю из вашего вопроса: у вас в основном есть N объектов с M* состояний. Вы хотите, чтобы каждый из этих N объектов выполнялся на разных процессах/потоках, но для состояний M*, которые относятся к n (от N) для применения в последовательности.

Ваше предлагаемое решение выглядит хорошо для меня. Что бы я сделал, так это:

Для каждого объекта создайте отдельную очередь (назовите ее N'), которая является в основном UUID ваших объектов в соответствии с вашим сообщением.

Затем вы имеете сервер/дистрибьютору, который служит трем целям:

  1. Создать постоянную очередь N'
  2. Присвоить один из пулов агентов случайным образом в этой теме, и рассылает его с ними
  3. Слушайте избивать сообщения от этих агентов каждый раз, чтобы убедиться, что они живы. Если это не так, выберите случайного произвольного агента и назначьте ему очередь.

Чтобы сделать этот дистрибутор аварийным и аварийным, он, вероятно, должен при запуске проверять существующие очереди и убедиться, что они назначены определенному агенту. Если нет, он должен назначить их как можно скорее.

Ваших сами агенты несут ответственность за:

  1. Слушая задания очереди из распределителя
  2. Обработки элементов (состояния М») в присвоенных им очереди
  3. Отправить сердцебиения дистрибьютора

Задачей обеспечения каждого агента является exclusive с дистрибьютором. Это не должно быть чьей-либо ответственностью, и вы не можете безопасно передать эту работу технологии обмена сообщениями, или, по крайней мере, я не знаю ни одной очереди, которая может справиться с этим, но вышеупомянутое решение должно с надеждой помочь с вашей проблемой.

Еще одна проблема, которая может возникнуть, когда удалять очередь. Вы можете сделать это в самих агентах, когда они обрабатывают все, но вам нужно убедиться, что они только делают это в общей блокировке с дистрибьютором. То, что вы не хотите, - это удалить очередь . пуст, но дистрибьютор собирается записать.