Я новичок в Flink, и я работаю с DataSet API. После целой группы обработки в качестве последнего этапа мне нужно нормализовать одно из значений, разделив его на максимальное значение. Итак, я использовал оператор .max()
, чтобы взять max, а позже я передаю результат как аргумент конструктора MapFunction.Flink выполняет поток данных дважды
Это работает, однако вся обработка выполняется дважды. Выполняется одно задание для определения максимальных значений, а затем выполняется другое задание для создания конечного результата (начиная с начала) ... Есть ли способ обхода всего потока данных только один раз?
final List<Tuple6<...>> maxValues = result.max(2).collect();
assert maxValues.size() == 1;
result.map(new NormalizeAttributes(maxValues.get(0))).writeAsCsv(...)
@FunctionAnnotation.ForwardedFields("f0; f1; f3; f4; f5")
@FunctionAnnotation.ReadFields("f2")
private static class NormalizeAttributes implements MapFunction<Tuple6<...>, Tuple6<...>> {
private final Tuple6<...> maxValues;
public NormalizeAttributes(Tuple6<...> maxValues) {
this.maxValues = maxValues;
}
@Override
public Tuple6<...> map(Tuple6<...> value) throws Exception {
value.f2 /= maxValues.f2;
return value;
}
}
Большое спасибо! ;) – kaser