2015-05-08 4 views
0

Я прочитал о штурме Apache и сделал несколько основных уроков. У меня есть следующая топология, которую я хотел бы реализовать с помощью шторма, но не уверен, как обрабатывать распределение данных. Требования к бизнесу: оценить портфель клиентов в режиме реального времени. В упрощенной форме он включает: 1) Принять живой пар рыночных цен (валюты, товары и т. Д.) 2) Для каждого ценового тика рассчитать текущую прибыль каждой позиции и конвертировать ее в валюту счета клиента 3) Анализ общее количество p/l и объем всех позиций на одного клиента и генерировать сигналы, если требуется 4) На уровне заказчика расчет должен быть последовательным и атомарным/сериализованным. I.e. все позиции должны оцениваться с каждым тиком в порядке его ввода в систему, а итоговые суммы должны рассчитываться на основе той же цены, даже если клиент имеет 100 позиций. 5) Анализ объемов/трендов всех позиций в системе, агрегированных по символу/типу клиента/стране/и т. Д., И сделать их доступными в какой-либо приборной панели.Параллелизм данных в Storm

Все заказы выполняются и хранятся в rdbms. Мой главный вопрос заключается в том, как распределить 100 тысяч тысяч позиций по штормовым болтам на разных узлах, которые каждый узел обрабатывает самостоятельно. Использование Modulo достаточно хорошо для разделения клиентов, но как я могу предоставить идентификатор каждому экземпляру болта, чтобы каждый из них обрабатывал только свою равную часть клиентов? Есть что-то из коробки в «Шторме»? Еще один вопрос: как эффективно действовать над агрегациями?

ответ

0

вы можете использовать fieldsGrouping для этого. вы можете объявить поле, по которому сгруппированы кортежи (в вашем случае id).

Я просто полагаю, что ваш входной поток JSON объект с идентификатором и полем тела как

{"id":"1234","body":"some body"} 

Также предположит, топология имеет один носик, два болта, а именно Болт и BoltB.

В BoltB, переопределите метод declareOutputFields и заполните деталь.

public void declareOutputFields(OutputFieldsDeclarer declarer) { 
    declarer.declare(new Fields("id","log")); 
} 

И вы можете объявить топологию как ниже

TopologyBuilder builder = new TopologyBuilder(); 
builder.setSpout("spout", spout, 1); 
builder.setBolt("boltA", new BoltA(), 1) 
     .shuffleGrouping("spout"); 
builder.setBolt("counterBolt", new BoltB(), 1).fieldsGrouping("boltB", new Fields("id")); 

В этом случае, кортежи с одинаковым идентификатором из boltA будут доставлены в тот же экземпляр boltB

+0

Можете ли вы ответить на мой второй вопрос Что ж? –

Смежные вопросы