2015-09-09 2 views
2

Я хотел бы реализовать алгоритм со следующей схемой доступа (по аналогии с алгоритмом конечных разностей):Flink: перекрывающееся чтение

finite difference

В этом примере первого значение dataset_1 используются для расчета первое и второе значение dataset_2. Итак, у меня должно быть 2 разных ключа для этого значения. Следовательно, некоторые значения of dataset_1 необходимо прочитать несколько раз (2 или 3 раза).

Я думаю, что мне нужно использовать трансформацию groupBy(key).reduce(Algorithm), но я не знаю, как определить ключи.

ответ

4

Flink DataSets не упорядочен, если вы не обрабатываете их в одном потоке, то есть с параллелизмом 1. Вы можете как каждый добавить последовательный индекс к вашим данным и используйте этот индекс в качестве ключа.

Из вашего примера я предполагаю, что значение с индексом 4 из dataset_2 вычисляется из значений 3, 4 и 5 из набора данных_1, то есть каждое значение dataset_2 выводится из трех (или двух) значений dataset_1.

Существует несколько способов сделать то, что вы хотите, некоторые из них просты в реализации, а другие более эффективны.

простой способ сделать то, что вы хотите, чтобы применить FlatMapFunction на dataset_1, который излучает каждое значение с индексом i три раза с тремя ключами i-1, i и i+1. После этого вы группируете результирующий набор данных на новые ключи и используете функцию GroupReduce для вычисления нового значения. Этот подход увеличивает объем данных данных dataset_1, но может быть легко распараллелен.

Другой вариант - сделать ручное разделение по диапазону, что похоже на первый подход, но немного более общий. Я снова предполагаю, что значения dataset_1 имеют последовательный атрибут idx. Используйте FlatMapFunction для назначения partitionIds значениям, то есть для размера раздела в 100 элементов выполните что-то вроде partitionId = idx/100. Первый и последний элемент раздела необходимо выпустить дважды. Например, элементы с idx 100 и 199 для partitionId 1 (значения от 100 до 199) должны быть реплицированы на разделы 0 и 2 соответственно, дважды испуская эти значения. После присвоения partitionIds вы можете groupBy(partitionId), sortGroup(idx) и groupReduce над всеми элементами раздела. Размер раздела настраивается.

+0

Я не могу принять первое решение, потому что я не могу утроить данные для целей производительности. Для второго решения я не вижу, как обрабатывать перекрывающиеся показания. MapPartition получает раздел как Iterable (по одному элементу за раз), но мне нужно 2 или 3 элемента входного набора данных. Не могли бы вы привести пример, показывающий перекрывающиеся чтения? –

+1

Я расширил описание подхода разбиения диапазона. –

+0

Теперь я понимаю, как мы можем использовать раздел. Благодаря! –

1

Это звучит как вычисление скользящего окна. Вы должны нам DataStream вместо DataSet и применить окно размером 3 и размер шага 1.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

DataStream dataset_2 = env.readTextFile(textPathTo-Dataset_1).window(Count.of(3)).every(Count.of(1)).WINDOW_FUNCTION(...).flatten(); 

Есть несколько WINDOW_FUNCTION доступны (например, тхп, мин, сумма или общая MapWindow, foldWindow, reduceWindow). Пожалуйста, ознакомьтесь с документированием, какая функция подходит для вашего прецедента: https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/streaming_guide.html

+0

Это решение будет работать только с одним потоком (параллелизм 1), если порядок элементов dataset_1 важен, правильно? –

Смежные вопросы