Я делаю функции на потоке данных,Flink: Trigger.onElement работает
DataStream<Tuple4<String,String,Double,Double>> price_warning=datastream_in .flatMap(new Splitter())// transformation flatmap .keyBy(2) .window(SlidingProcessingTimeWindows.of(Time.seconds(180),Time.seconds(10))) .trigger(new ElementTimeTrigger()) .apply(new WindowFunction());
Это часть моего кода, только за идею, что я делаю. Здесь, на datastream, я делаю flatmap для анализа datastream_in
в Tuple
, тогда поток keyby
на второй поданной кортежи. После этого я применяю скользящее окно, а затем trigger
. Здесь я использую метод onElement()
для запуска. Наконец, я использую apply
как пользовательскую функцию.
Когда я запускаю код, функция apply называется 18 раз (180/10, скользящее окно) для каждого сообщения, которое я проанализировал. В чем причина этого? Как триггер точно работает с раздвижным окном?
** Кроме того, я могу предоставить весь код.