Сложная часть заключается в том, что вы запускаете код в распределенной системе, поэтому параллельные экземпляры вашей функции ParseData
работают независимо друг от друга.
Вы по-прежнему можете назначать уникальные идентификаторы с помощью локального ID-счетчика в ParseData
. Трюк во избежание дублирования - правильная инициализация и счетчик. Предположим, что у вас параллелизм четырех, вы получите четыре экземпляра ParseData
(назовем их PD1 ... PD4
). Это можно сделать следующие ID задания:
PD1: 0, 4, 8, 12, ...
PD2: 1, 5, 9, 13, ...
PD3, 2, 6, 10, 14, ...
PD4: 3, 7, 11, 15, ...
Вы можете сделать это путем инициализации параллельных экземпляров с различными значениями (подробности ниже) и увеличивать количество в каждом случае ваш параллелизм (т.е. ID += parallelism
).
В Flink все объекты параллельной функции получают уникальный номер, назначенный (так называемый индекс задачи) автоматически. Вы можете просто использовать этот номер для инициализации счетчика ID. Вы можете получить индекс задачи через RuntimeContext.getIndexOfThisSubtask()
. Вы также можете получить оператор/функцию параллелизм через RuntimeContext.getNumberOfParallelSubtasks()
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RuntimeContext.html
Чтобы получить RuntimeContext
использовать RichMapFunction
для реализации ParseData
и вызвать getRuntimeContext()
в open()
.
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RichFunction.html
Нечто подобное (показывать только соответствующие методы):
class ParseDate extends RichMapFunction {
private long parallelism;
private long idCounter;
public void open(Configuration parameters) {
RuntimeContext ctx = getRuntimeContext();
parallelism = ctx.getNumberOfParallelSubtasks();
idCounter = ctx.getIndexOfThisSubtask();
}
public OutputDataType map(InputDataType value) {
OutputDataType output = new OutputDataType();
output.setID(idCounter);
idCounter += parallelism;
// further processing
return output;
}
}
Спасибо, что работал для меня. Мне пришлось добавить 'public void open (Параметры конфигурации)' для его работы. Тем не менее, таким образом, последние идентификаторы не являются непрерывными (при каждом запуске они назначаются по-разному), но я думаю, что это связано с количеством элементов, назначенных каждому экземпляру. –
Исправлен открытый метод в моем ответе - спасибо, что указали его. И да, если данные не распределены одинаково, вы можете не получить последовательный идентификатор, который будет очень тяжелым, так как вам понадобится общее глобальное состояние (которое может сильно повлиять на вашу производительность). Я забыл эту деталь в вашем вопросе. –