Я реализую болт в Storm, который получает сообщения от носа RabbitMQ (https://github.com/ppat/storm-rabbitmq).Apache Storm Join Pattern - по крайней мере один раз
Каждое событие, которое я должен обработать в Storm, появляется в виде двух сообщений от Кролика, поэтому у меня есть поля на болтах, так что два сообщения поступают в один и тот же болт.
Моего первый подход, я бы:
- Получит первый кортеж и сохранить сообщение в памяти
- Ack первого кортеж
- Когда второй Кортеж прибыл принести первый из памяти и испускает новый кортеж привязанный ко второму от носика.
Это сработало, но я мог потерять сообщения, если работник умер, потому что я бы выиграл первый кортеж, прежде чем получить вторую и обработать.
Я изменил это:
- Получит первый кортеж и сохранить его в памяти
- Когда второй Кортеж прибыл принести первый из памяти, выделяющий новый кортеж закрепленного на оба входных кортежи и Ack как вход кортежи.
Кэш в памяти - это кэш Guava с истечением срока действия и при высылке Tuple из-за таймаута я потерю() его в топологии, чтобы он был повторно обработан последним.
Это, казалось, сработало, но когда я провел несколько тестов, я попал в ситуацию, когда система перестала получать сообщения из очереди кроликов.
Предварительная выборка в очереди установлена на 5, а носик - с setMaxSpoutPending на 7. В интерфейсе Rabbit я вижу 5 Unacked messages.
В штормовых журналах я вижу, что одни и те же кортежи выселяются из кеша снова и снова.
Я понимаю, что проблема в том, что носик получит только 5 сообщений, которые являются первой частью пары. Я могу увеличить предварительную выборку, но это не гарантирует, что это не произойдет в производстве.
Так что мой вопрос: Как реализовать объединение при работе с этими проблемами в Storm?