2016-01-01 2 views
1

В моей программе Flink я преобразовываю свои данные с помощью операции flatMap, которая делит несколько блоков данных на несколько меньших блоков. Эти блоки имеют атрибут «позиция», который описывает их положение в соответствующем исходном блоке. Теперь я использую groupReduce, которому нужно преобразовать все мелкие блоки, которые имеют один и тот же атрибут «позиция». Поэтому он должен быть легко распределен на нескольких узлах. Но когда я запускаю свою программу на нескольких узлах groupReduce выполняются с диоктилфталатом 1.Усиление степени распараллеливания преобразования groupReduce

Я думаю, это потому, что у меня есть только один DataSet, но мне кажется, что GroupedDataSet не доступна в Flink Java API. Есть ли еще одна возможность улучшить доп моей трансформации groupReduce?

Вот код, я использую (фиктивный код пренебрегая «подробность»):

DataSet<SlicedTile> slicedTiles = tiles.flatMap() 
    .groupBy(position) 
    .sortGroup(time) 
    .getDataSet() 
    //Until here the dop is correct 

DataSet<SlicedTile> processedSlicedTiles = slicedTiles.reduceGroup; 

ответ

2

Проблемы с кодом является getDataSet() вызовом. Он возвращает вход операции группировки. Следовательно, набор данных, представленный slicedTiles, не группируется и не сортируется по его группам, а вместо этого является результатом преобразования flatMap, а вызовы groupBy и sortGroup вообще не рассматриваются в программе.

Применение операции groupReduce (или reduce) в негрупповом наборе данных всегда является непараллельной операцией, поскольку все элементы набора входных данных обрабатываются как одна группа.

Логически три преобразования groupBy().sortGroup().reduceGroup() принадлежат друг другу и переводятся в один оператор groupReduce (возможно, с дополнительным объединителем, если GroupReduceFunction можно комбинировать).

Если вы изменили свою реализацию следующим образом, она должна работать должным образом.

DataSet<SlicedTile> slicedTiles = tiles.flatMap() 
    .groupBy(position) 
    .sortGroup(time) 
    .reduceGroup(yourFunction); 

Открою вопрос JIRA добавить JavaDocs методу Grouping.getDataSet() документировать поведение этой функции.

Смежные вопросы