2016-08-02 2 views
2

Топология выглядит так. The Topology : Когда я использую шторм, как я могу обеспечить, чтобы болт с несколькими входами обрабатывался только тогда, когда все входы поступали?

Как я могу обеспечить, чтобы болт с несколькими входами обрабатывался только при поступлении всех входов?

ответ

0

Bolt.execute() вызывается для каждого входящего кортежа, независимо от того, что производитель (и вы не можете изменить это). Если вы хотите обработать несколько кортежей у разных производителей сразу, вам нужно написать собственный код UDF.

  1. Вам нужен входной буфер для каждого производителя, который может буферизацию входящих кортежей (возможно LinkedList<Tuple>, как задвижка)
  2. Для каждого входящего кортежа, добавьте кортеж в соответствующий буфер (вы можете получить доступ к производителю информация в метаданных tuple, через. input.getSourceComponent()
  3. После добавления кортежа в буфер, вы проверяете, если каждый буфер содержит по крайней мере один кортеж: если да, возьмите один кортеж из каждого буфера, обработайте их (после обработки, проверьте буферы снова, пока, по крайней мере, один раз буфер не будет пустым) - нет, просто возвращайтесь и ничего не обрабатывайте.
0

Возможно, вы захотите посмотреть here (см. Раздел «Дозирование»). Для болтов, которые обрабатывают более сложные операции, такие как агрегация на нескольких входных кортежах, вам нужно будет расширить BaseRichBolt и самостоятельно управлять механизмом привязки.

Для этого вам нужно объявить свой собственный выходной коллектор, как это:

private OutputCollector outputCollector; 

А затем отформатируйте ее с помощью переопределения метода подготовки:

@Override 
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { 
    this.outputCollector = outputCollector; 
} 

Ваш выполнить метод BaseRichBolt только получает кортеж в качестве аргумента, вы должны иметь возможность выполнять логику для поддержки якорей и использования их при испускании.

private final List<Tuple> anchors = new ArrayList<Tuple>(); 

@Override 
public void execute(Tuple tuple) { 
    if (!isTupleAggregationComplete(anchors, tuple)) { 
     anchors.add(tuple); 
     return; 
    } 

    // do your computations here! 

    collector.emit(anchors, new Values(foo,bar,xpto)); 

    anchors.clear(); 
} 

Вы должны осуществлять isTupleAggregationComplete с необходимой логикой, которая проверяет, если болт есть все необходимая информация, чтобы приступить к обработке.

Смежные вопросы