2013-09-26 2 views
2

Я пытаюсь настроить Storm для объединения потока, но с различными (доступными DRPC) метриками в том же потоке.#Storm: как настроить различные показатели для одного и того же источника данных

E.g. поток состоит из сообщений, у которых есть отправитель, получатель, канал, через который поступило сообщение, и шлюз, через который он был доставлен. У меня возникли проблемы с решением, как организовать одну или несколько топологий, которые могли бы дать мне, например. общее количество сообщений по шлюзу и/или по каналу. И кроме того, количество минут в минуту было бы неплохо.

Основная идея - иметь носик, который будет принимать события обмена сообщениями, и оттуда собирать данные по мере необходимости. В настоящее время я играю с Trident и DRPC, и я придумал две возможные топологии, которые решают проблему на данном этапе. Не могу решить, какой подход лучше, если он есть ?!

Весь источник доступен в этом gist. Он имеет три класса:

  • RandomMessageSpout
    • , используемый для испускают данные сообщений
    • имитирует реальный источник данных,
  • SeparateTopology
    • создает отдельный DRPCПоток для каждой метрики необходим
    • также отдельный запрос состояние создается для каждой метрики
    • все они используют тот же Носик экземпляр
  • CombinedTopology
    • создает единый поток DRPC со всеми метриками необходимо
    • создает отдельное состояние запроса для каждой метрики
    • Каждое состояние запроса извлекает желаемую метрику и группы re зультатов для него

Теперь, проблемы и вопросы:

  • SeparateTopology
    • является необходимым использовать тот же экземпляр носик или я могу только сказать, новый RandomMessageSpout () каждый раз?
    • Мне нравится идея, что мне не нужно сохранять сгруппированные данные по всем метрикам, но только группы, которые нам нужно извлечь позже.
    • - это выброшенные данные, фактически обработанные всеми комбинациями состояний/запросов, например. а не первый?
    • будет ли это также позже включать динамическое добавление новых комбинаций состояний/запросов во время выполнения?
  • CombinedTopology
    • мне не очень нравится идея, что мне нужно для сохранения данных, сгруппированных по всем метрикам, так как я не нужно все комбинации
    • это стало неожиданностью, что все показатели всегда возвращают одни и те же данные
      • eg канала и шлюза запросы возвращают метрики состояния данных
      • я обнаружил, что это всегда данные, сгруппированные по первому полю в state definition
      • this topic объясняет рассуждение за этим поведением
      • , но мне интересно, если это хороший способ делать редеет в первую очередь (и найдет способ обойти эту проблему, если это необходимо)
  • SnapshotGet против TupleCollectionGet в stateQuery
    • с моментальным снимкомПочему все работает, но не всегда, только TupleCollectionGet решил проблему
    • любые указатели относительно того, что является правильным способом сделать это?

Я предполагаю, что это удлиненно вопрос/тема, но любая помощь очень ценится! Кроме того, если я полностью упустил архитектуру, предложения о том, как это сделать, будут наиболее желанными. Заранее спасибо :-)

ответ

0

Вы не можете фактически разделить поток в SeparateTopology, вызывая newStream() используя тот же экземпляр носик, так как это позволит создать новые экземпляры одного и того же RandomMessageSpout носиком, что приведет к дублированию значений излучаемого к вашей топологии несколькими, отдельными экземплярами носика. (Распараллеливание Spout возможно только в Storm с секционированными носиками, где каждый экземпляр носика обрабатывает раздел всего набора данных - например, раздел Kafka).

Правильный подход здесь, чтобы изменить CombinedTopology разделить поток на несколько потоков по мере необходимости для каждой метрики вам нужно (смотри ниже), а затем сделать groupBy() полем соответствующего показателя и persistentAggregate() на каждом новом разветвленного потоке.

От Trident FAQ,

"каждый" возвращает объект поток, который можно хранить в переменной. Затем вы можете запустить несколько eaches на том же потоке, чтобы разбить его, например .:

Stream s = topology.each(...).groupBy(...).aggregate(...) 
Stream branch1 = s.each(...) 
Stream branch2 = s.each(...) 

См this thread на список рассылки Шторма и this one для получения дополнительной информации.

Смежные вопросы