2017-01-05 2 views
0

Я делаю функции на потоке данных,Flink: Trigger.onElement работает

DataStream<Tuple4<String,String,Double,Double>> price_warning=datastream_in .flatMap(new Splitter())// transformation flatmap .keyBy(2) .window(SlidingProcessingTimeWindows.of(Time.seconds(180),Time.seconds(10))) .trigger(new ElementTimeTrigger()) .apply(new WindowFunction());

Это часть моего кода, только за идею, что я делаю. Здесь, на datastream, я делаю flatmap для анализа datastream_in в Tuple, тогда поток keyby на второй поданной кортежи. После этого я применяю скользящее окно, а затем trigger. Здесь я использую метод onElement() для запуска. Наконец, я использую apply как пользовательскую функцию.

Когда я запускаю код, функция apply называется 18 раз (180/10, скользящее окно) для каждого сообщения, которое я проанализировал. В чем причина этого? Как триггер точно работает с раздвижным окном?

** Кроме того, я могу предоставить весь код.

ответ

0

Trigger указывает, где должно быть выбрано окно. В вашем случае, если вы правильно поняли, вы вызываете окно, излучающее для каждого элемента, который входит в ваш поток, таким образом, номер 18. У SlidingProcessingTimeWindow уже есть триггер по умолчанию, который будет вызывать окна, когда время обработки пройдет в конце окна.

Дополнительную информацию о концепции Trigger вы можете прочитать this

Смежные вопросы