Возможно, вы захотите посмотреть here (см. Раздел «Дозирование»). Для болтов, которые обрабатывают более сложные операции, такие как агрегация на нескольких входных кортежах, вам нужно будет расширить BaseRichBolt и самостоятельно управлять механизмом привязки.
Для этого вам нужно объявить свой собственный выходной коллектор, как это:
private OutputCollector outputCollector;
А затем отформатируйте ее с помощью переопределения метода подготовки:
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector = outputCollector;
}
Ваш выполнить метод BaseRichBolt только получает кортеж в качестве аргумента, вы должны иметь возможность выполнять логику для поддержки якорей и использования их при испускании.
private final List<Tuple> anchors = new ArrayList<Tuple>();
@Override
public void execute(Tuple tuple) {
if (!isTupleAggregationComplete(anchors, tuple)) {
anchors.add(tuple);
return;
}
// do your computations here!
collector.emit(anchors, new Values(foo,bar,xpto));
anchors.clear();
}
Вы должны осуществлять isTupleAggregationComplete с необходимой логикой, которая проверяет, если болт есть все необходимая информация, чтобы приступить к обработке.