2009-06-30 5 views
1

Коллекция агрегатор используется в 2.0 рамках Mule работает немного, как это:Mule агрегатор - Streaming Aggregation

  • Входящее маршрутизатор принимает коллекцию сообщений и разбивает его на несколько более мелких сообщений - каждый меньше сообщения получить штамп с корреляционной идентификатор, соответствующей родительским сообщением

  • Эти сообщения проходят через различные услуги

  • Наконец эти сообщения поступают на входящем агрегаторе, что собирает сообщения на основе идентификатора корреляции родительского сообщения и количества ожидаемых сообщений. После того как все ожидаемые сообщения были получены, вызывается функция агрегации и возвращается результат.

Теперь это нормально работает, когда количество сообщений в группе достаточно мало. Однако, как только количество сообщений в группе становится огромным ~ 100k, тогда большая часть памяти привязывается к группе сообщений, ожидающих прибытия более поздних сообщений. Это ухудшается, если одновременно группируются несколько групп.

Путь к этой проблеме заключается в реализации потокового агрегатора. В моем случае использования я, по сути, суммирую различные сообщения на основе ключа, и это можно сделать без необходимости одновременного просмотра всех сообщений в группе. Я хотел бы только знать, что все сообщения были получены до отправки результата на конечную точку.

Это звучит как разумное решение проблемы?

Это уже реализовано где-то в Муле?

Есть ли лучшие способы сделать это?

ответ

2

Это похоже на разумный подход (я не специалист по Mule каким-либо образом), и я прочитал всю документацию Mule и не думаю, что есть что-то вроде этого (поддержка потоковой передачи ограничена к нескольким разъемам и трансформаторам - это довольно просто, поскольку он просто проходит через InputStream). Только несколько вещей в потоке Mule, поэтому вам может понадобиться использовать другие модифицированные трансформаторы (если вы их используете). Вы бы просто реализовали агрегатор, который предоставляет InputStream и запускает потоковое вещание, как только получится последовательность последовательных сообщений.

Однако одно предложение в вашем описании «... все сообщения были получены до перенаправления результатов в конечную точку», может быть тревожным. Это по своей природе поражает цель потоковой передачи, если вы не имеете в виду, что вы (в своем сервисном компоненте предположительно) будете отслеживать, что у вас есть все, прежде чем перенаправить обработанный результат (предположительно намного меньше).

Смежные вопросы