2016-07-27 3 views
0

Я загружаю CSV-файл и преобразовываю каждую строку в POJO с помощью пользовательской функции карты. Для моей логики программы мне нужно для каждого POJO уникальный идентификатор от 0 до n (где n - общие номера строк). Мой вопрос: могу ли я назначить уникальный идентификатор (например, номер начальной строки) для каждого POJO с помощью функции преобразования? Идеальным способом было бы получить Iterable в UDF и увеличивать переменную, итерации через входные кортежи, и, наконец, вывод соответствующего POJO. Мой код в настоящее время выглядит следующим образом:Apache Flink - присвоить уникальный идентификатор для ввода

DataSet<MyType> input = env.readCsvFile("/path/file.csv") 
    .includeFields("1111") 
    .types(String.class, Double.class, Double.class,Double.class) 
    .map(new ParseData()); 

где ParseData превращает кортежи в MyType POJOs.

Есть ли какие-либо рекомендации по достижению этой задачи?

ответ

2

Сложная часть заключается в том, что вы запускаете код в распределенной системе, поэтому параллельные экземпляры вашей функции ParseData работают независимо друг от друга.

Вы по-прежнему можете назначать уникальные идентификаторы с помощью локального ID-счетчика в ParseData. Трюк во избежание дублирования - правильная инициализация и счетчик. Предположим, что у вас параллелизм четырех, вы получите четыре экземпляра ParseData (назовем их PD1 ... PD4). Это можно сделать следующие ID задания:

PD1: 0, 4, 8, 12, ... 
PD2: 1, 5, 9, 13, ... 
PD3, 2, 6, 10, 14, ... 
PD4: 3, 7, 11, 15, ... 

Вы можете сделать это путем инициализации параллельных экземпляров с различными значениями (подробности ниже) и увеличивать количество в каждом случае ваш параллелизм (т.е. ID += parallelism).

В Flink все объекты параллельной функции получают уникальный номер, назначенный (так называемый индекс задачи) автоматически. Вы можете просто использовать этот номер для инициализации счетчика ID. Вы можете получить индекс задачи через RuntimeContext.getIndexOfThisSubtask(). Вы также можете получить оператор/функцию параллелизм через RuntimeContext.getNumberOfParallelSubtasks()

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RuntimeContext.html

Чтобы получить RuntimeContext использовать RichMapFunction для реализации ParseData и вызвать getRuntimeContext() в open().

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RichFunction.html

Нечто подобное (показывать только соответствующие методы):

class ParseDate extends RichMapFunction { 
    private long parallelism; 
    private long idCounter; 

    public void open(Configuration parameters) { 
     RuntimeContext ctx = getRuntimeContext(); 
     parallelism = ctx.getNumberOfParallelSubtasks(); 
     idCounter = ctx.getIndexOfThisSubtask(); 
    } 

    public OutputDataType map(InputDataType value) { 
     OutputDataType output = new OutputDataType(); 
     output.setID(idCounter); 
     idCounter += parallelism; 
     // further processing 
     return output; 
    } 
} 
+0

Спасибо, что работал для меня. Мне пришлось добавить 'public void open (Параметры конфигурации)' для его работы. Тем не менее, таким образом, последние идентификаторы не являются непрерывными (при каждом запуске они назначаются по-разному), но я думаю, что это связано с количеством элементов, назначенных каждому экземпляру. –

+0

Исправлен открытый метод в моем ответе - спасибо, что указали его. И да, если данные не распределены одинаково, вы можете не получить последовательный идентификатор, который будет очень тяжелым, так как вам понадобится общее глобальное состояние (которое может сильно повлиять на вашу производительность). Я забыл эту деталь в вашем вопросе. –

Смежные вопросы