2015-11-02 3 views
2

Я реализую болт в Storm, который получает сообщения от носа RabbitMQ (https://github.com/ppat/storm-rabbitmq).Apache Storm Join Pattern - по крайней мере один раз

Каждое событие, которое я должен обработать в Storm, появляется в виде двух сообщений от Кролика, поэтому у меня есть поля на болтах, так что два сообщения поступают в один и тот же болт.

Моего первый подход, я бы:

  1. Получит первый кортеж и сохранить сообщение в памяти
  2. Ack первого кортеж
  3. Когда второй Кортеж прибыл принести первый из памяти и испускает новый кортеж привязанный ко второму от носика.

Это сработало, но я мог потерять сообщения, если работник умер, потому что я бы выиграл первый кортеж, прежде чем получить вторую и обработать.

Я изменил это:

  1. Получит первый кортеж и сохранить его в памяти
  2. Когда второй Кортеж прибыл принести первый из памяти, выделяющий новый кортеж закрепленного на оба входных кортежи и Ack как вход кортежи.

Кэш в памяти - это кэш Guava с истечением срока действия и при высылке Tuple из-за таймаута я потерю() его в топологии, чтобы он был повторно обработан последним.

Это, казалось, сработало, но когда я провел несколько тестов, я попал в ситуацию, когда система перестала получать сообщения из очереди кроликов.

Предварительная выборка в очереди установлена ​​на 5, а носик - с setMaxSpoutPending на 7. В интерфейсе Rabbit я вижу 5 Unacked messages.

В штормовых журналах я вижу, что одни и те же кортежи выселяются из кеша снова и снова.

Я понимаю, что проблема в том, что носик получит только 5 сообщений, которые являются первой частью пары. Я могу увеличить предварительную выборку, но это не гарантирует, что это не произойдет в производстве.

Так что мой вопрос: Как реализовать объединение при работе с этими проблемами в Storm?

ответ

1

Шторм не обеспечивает хорошее решение для этого ... Что бы нужно, это надежного хранение, буфера первого кортежа (т.е. с сохранением состояния оператора). Таким образом, вы можете сразу получить первый кортеж и восстановить состояние после сбоя.

  1. Насколько я знаю, Trident поддерживает некоторую обработку состояния. Но я никогда не использовал его.
  2. В качестве второй альтернативы вы можете использовать распределенный хранилище ключей (например, Casandra) в качестве буфера. Конечно, это будет рукописное решение, т. Е. Вам нужно закодировать все взаимодействия Casandra самостоятельно.
  3. Последнее, но не менее важное: вы можете переключиться на систему обработки потоков, которая поддерживает поддерживающие состояние операторы, такие как Apache Flink.(отказ от ответственности: я являюсь коммиттером в Флинке)
Смежные вопросы