2015-07-07 2 views
1

У нас есть задание DataFlow, которое подписано на поток событий PubSub. Мы применили раздвижные окна 1 часа с 10-минутным периодом. В нашем коде мы выполняем Count.perElement, чтобы получить подсчеты для каждого элемента, и затем мы хотим запустить это через Top.of, чтобы получить верхние N элементов.Применение нескольких преобразований GroupByKey в задании DataFlow, при котором окна будут применяться несколько раз

На высоком уровне: 1) Чтение из PubSub IO 2) Window.into (SlidingWindows.of (windowSize) .every (период)) // windowSize = 1 час, период = 10 мин 3) Количество .perElement 4) Top.of (n, compareFunction)

Что мы видим, так это то, что окно применяется дважды, поэтому данные, кажется, обозначены водяным знаком 1 час 40 минут (вместо 50 минут) за текущее время. Когда мы входим в график задания на консоли Dataflow, мы видим, что в данных по двум операциям выполняются две операции groupByKey: 1) Как часть Count.perElement. Водяной знак на данных с этого шага вперед составляет 50 минут по сравнению с текущим временем, которое ожидается. 2) Как часть Top.of (в Combine.PerKey). Водяной знак на этом, кажется, еще на 50 минут отстает от текущего времени. Таким образом, данные в шагах ниже этого обозначены с водяными знаками на 1:40 мин.

Это в конечном счете проявляется в некоторых нижестоящих графиках, опоздавших на 50 минут.

Таким образом, кажется, что каждый раз, когда применяется GroupByKey, окно, похоже, заново заново.

Ожидается ли такое поведение? В любом случае, мы можем сделать окно только применимым для Count.perElement и отключить его после этого?

Наш код что-то на линии:

final int top = 50; 
final Duration windowSize = standardMinutes(60); 
final Duration windowPeriod = standardMinutes(10); 
final SlidingWindows window = SlidingWindows.of(windowSize).every(windowPeriod); 

options.setWorkerMachineType("n1-standard-16"); 
options.setWorkerDiskType("compute.googleapis.com/projects//zones//diskTypes/pd-ssd"); 
options.setJobName(applicationName); 
options.setStreaming(true); 
options.setRunner(DataflowPipelineRunner.class); 

final Pipeline pipeline = Pipeline.create(options); 

// Get events 
final String eventTopic = 
    "projects/" + options.getProject() + "/topics/eventLog"; 
final PCollection<String> events = pipeline 
    .apply(PubsubIO.Read.topic(eventTopic)); 

// Create toplist 
final PCollection<List<KV<String, Long>>> topList = events 
    .apply(Window.into(window)) 
    .apply(Count.perElement()) //as eventIds are repeated 
    // get top n to get top events 
    .apply(Top.of(top, orderByValue()).withoutDefaults()); 
+0

Система не обновляется в каждом GroupByKey, но каждый этап вычисления потенциально добавляет некоторую дополнительную задержку, потому что Top не может произойти до тех пор, пока граф не выведет все элементы в этом окне. У вас есть какие-либо данные о том, сколько данных обрабатывается? Как насчет чисел из операций Count и Top? –

+0

Мы обрабатываем около 80 тыс. Сообщений в начале. Когда дело доходит до Count, мы делаем около 2K сообщений/с, а в верхней части мы имеем одинаковое число (на основе того, что я вижу на панели данных Dataflow). – Piyush

+0

Я бы предположил, что Top тогда просто добавит задержку в десять минут (а не еще 50). Граф будет испускать каждые 10 минут, поэтому Top должен быть всего 10 минут. – Piyush

ответ

1

Windowing не применяется каждый раз, когда есть GroupByKey. Задержка, которую вы видели, вероятно, была результатом двух проблем, которые должны быть решены.

Первым было то, что данные, которые были буферизованы для более поздних окон в первой группе по ключу, препятствовали продвижению водяного знака, что означало, что более ранние окна задерживались во второй группе по ключу. Это было исправлено в последних версиях SDK.

Вторая причина заключалась в том, что раздвижные окна вызывали значительное увеличение объема данных. Добавлена ​​новая оптимизация, в которой используется комбинат (вы упоминали Count и Top), чтобы уменьшить объем данных.

+0

Спасибо, попробуем это. Знаете ли вы, когда ожидается обновление пакета обновления SDK? Мы не против вытащить последние изменения от мастера и попробовать их :-) – Piyush

+0

В последнем выпуске на Maven (0.4.150710) должны быть упомянутые улучшения. Если вы попробуете обновить, сообщите нам, как это происходит. –

Смежные вопросы