2015-04-16 5 views
1

Что я могу сделать, это группировать поток по двум полям ("remote-client-ip", "request-params") и подсчитывать количество кортежей в каждой группе. И объедините их в карту. Вот моя топология:Группировка с несколькими полями Storm

topology.newStream("kafka-spout-stream-1", repeatSpout) 
        .each(new Fields("str"), new URLParser(), new Fields(fieldNames)) 
        .each(new Fields("remote-client-ip", "request-params"), new HTTPParameterExtractor(), new Fields("query-string")) 
        .groupBy(new Fields("remote-client-ip", "query-string")) 
        .aggregate(new Fields("remote-client-ip", "query-string"), new Count(), new Fields("user-word-count")) 
        .groupBy(new Fields("remote-client-ip")) 
        .persistentAggregate(new MemoryMapState.Factory(), new UserQueryStringCombiner(), new Fields("user-word-count-list")); 

Но после отладки, я нашел поток данных блокируются на первый groupBy(), который является многопрофильным полой группировкой. Я ничего не выполнил для Count() в следующей заявке агрегата.

Таким образом, я думаю, что неправильно понимаю концепцию взаимодействия между группированием нескольких полей и агрегацией.

Пожалуйста, дайте мне знать, правильные или неправильные мои предположения. Спасибо!

ответ

1

Вы группируете уже сгруппированные поля с помощью функции Aggregate() в своей топологии. Попробуйте это:

.aggregate(new Count(), new Fields("user-word-count")) 

Вместо этого:

.aggregate(new Fields("remote-client-ip", "query-string"), new Count(), new Fields("user-word-count")) 
Смежные вопросы