У нас есть задание DataFlow, которое подписано на поток событий PubSub. Мы применили раздвижные окна 1 часа с 10-минутным периодом. В нашем коде мы выполняем Count.perElement, чтобы получить подсчеты для каждого элемента, и затем мы хотим запустить это через Top.of, чтобы получить верхние N элементов.Применение нескольких преобразований GroupByKey в задании DataFlow, при котором окна будут применяться несколько раз
На высоком уровне: 1) Чтение из PubSub IO 2) Window.into (SlidingWindows.of (windowSize) .every (период)) // windowSize = 1 час, период = 10 мин 3) Количество .perElement 4) Top.of (n, compareFunction)
Что мы видим, так это то, что окно применяется дважды, поэтому данные, кажется, обозначены водяным знаком 1 час 40 минут (вместо 50 минут) за текущее время. Когда мы входим в график задания на консоли Dataflow, мы видим, что в данных по двум операциям выполняются две операции groupByKey: 1) Как часть Count.perElement. Водяной знак на данных с этого шага вперед составляет 50 минут по сравнению с текущим временем, которое ожидается. 2) Как часть Top.of (в Combine.PerKey). Водяной знак на этом, кажется, еще на 50 минут отстает от текущего времени. Таким образом, данные в шагах ниже этого обозначены с водяными знаками на 1:40 мин.
Это в конечном счете проявляется в некоторых нижестоящих графиках, опоздавших на 50 минут.
Таким образом, кажется, что каждый раз, когда применяется GroupByKey, окно, похоже, заново заново.
Ожидается ли такое поведение? В любом случае, мы можем сделать окно только применимым для Count.perElement и отключить его после этого?
Наш код что-то на линии:
final int top = 50;
final Duration windowSize = standardMinutes(60);
final Duration windowPeriod = standardMinutes(10);
final SlidingWindows window = SlidingWindows.of(windowSize).every(windowPeriod);
options.setWorkerMachineType("n1-standard-16");
options.setWorkerDiskType("compute.googleapis.com/projects//zones//diskTypes/pd-ssd");
options.setJobName(applicationName);
options.setStreaming(true);
options.setRunner(DataflowPipelineRunner.class);
final Pipeline pipeline = Pipeline.create(options);
// Get events
final String eventTopic =
"projects/" + options.getProject() + "/topics/eventLog";
final PCollection<String> events = pipeline
.apply(PubsubIO.Read.topic(eventTopic));
// Create toplist
final PCollection<List<KV<String, Long>>> topList = events
.apply(Window.into(window))
.apply(Count.perElement()) //as eventIds are repeated
// get top n to get top events
.apply(Top.of(top, orderByValue()).withoutDefaults());
Система не обновляется в каждом GroupByKey, но каждый этап вычисления потенциально добавляет некоторую дополнительную задержку, потому что Top не может произойти до тех пор, пока граф не выведет все элементы в этом окне. У вас есть какие-либо данные о том, сколько данных обрабатывается? Как насчет чисел из операций Count и Top? –
Мы обрабатываем около 80 тыс. Сообщений в начале. Когда дело доходит до Count, мы делаем около 2K сообщений/с, а в верхней части мы имеем одинаковое число (на основе того, что я вижу на панели данных Dataflow). – Piyush
Я бы предположил, что Top тогда просто добавит задержку в десять минут (а не еще 50). Граф будет испускать каждые 10 минут, поэтому Top должен быть всего 10 минут. – Piyush