Flink DataSets не упорядочен, если вы не обрабатываете их в одном потоке, то есть с параллелизмом 1. Вы можете как каждый добавить последовательный индекс к вашим данным и используйте этот индекс в качестве ключа.
Из вашего примера я предполагаю, что значение с индексом 4 из dataset_2 вычисляется из значений 3, 4 и 5 из набора данных_1, то есть каждое значение dataset_2 выводится из трех (или двух) значений dataset_1.
Существует несколько способов сделать то, что вы хотите, некоторые из них просты в реализации, а другие более эффективны.
простой способ сделать то, что вы хотите, чтобы применить FlatMapFunction
на dataset_1, который излучает каждое значение с индексом i
три раза с тремя ключами i-1
, i
и i+1
. После этого вы группируете результирующий набор данных на новые ключи и используете функцию GroupReduce для вычисления нового значения. Этот подход увеличивает объем данных данных dataset_1, но может быть легко распараллелен.
Другой вариант - сделать ручное разделение по диапазону, что похоже на первый подход, но немного более общий. Я снова предполагаю, что значения dataset_1 имеют последовательный атрибут idx
. Используйте FlatMapFunction для назначения partitionIds
значениям, то есть для размера раздела в 100 элементов выполните что-то вроде partitionId = idx/100
. Первый и последний элемент раздела необходимо выпустить дважды. Например, элементы с idx
100 и 199 для partitionId 1 (значения от 100 до 199) должны быть реплицированы на разделы 0 и 2 соответственно, дважды испуская эти значения. После присвоения partitionIds
вы можете groupBy(partitionId)
, sortGroup(idx)
и groupReduce
над всеми элементами раздела. Размер раздела настраивается.
Я не могу принять первое решение, потому что я не могу утроить данные для целей производительности. Для второго решения я не вижу, как обрабатывать перекрывающиеся показания. MapPartition получает раздел как Iterable (по одному элементу за раз), но мне нужно 2 или 3 элемента входного набора данных. Не могли бы вы привести пример, показывающий перекрывающиеся чтения? –
Я расширил описание подхода разбиения диапазона. –
Теперь я понимаю, как мы можем использовать раздел. Благодаря! –