2016-08-23 2 views
1

Я новичок в Flink, и я работаю с DataSet API. После целой группы обработки в качестве последнего этапа мне нужно нормализовать одно из значений, разделив его на максимальное значение. Итак, я использовал оператор .max(), чтобы взять max, а позже я передаю результат как аргумент конструктора MapFunction.Flink выполняет поток данных дважды

Это работает, однако вся обработка выполняется дважды. Выполняется одно задание для определения максимальных значений, а затем выполняется другое задание для создания конечного результата (начиная с начала) ... Есть ли способ обхода всего потока данных только один раз?

final List<Tuple6<...>> maxValues = result.max(2).collect(); 
    assert maxValues.size() == 1; 
    result.map(new NormalizeAttributes(maxValues.get(0))).writeAsCsv(...) 

@FunctionAnnotation.ForwardedFields("f0; f1; f3; f4; f5") 
@FunctionAnnotation.ReadFields("f2") 
private static class NormalizeAttributes implements MapFunction<Tuple6<...>, Tuple6<...>> { 

    private final Tuple6<...> maxValues; 

    public NormalizeAttributes(Tuple6<...> maxValues) { 
     this.maxValues = maxValues; 
    } 

    @Override 
    public Tuple6<...> map(Tuple6<...> value) throws Exception { 
     value.f2 /= maxValues.f2; 
     return value; 
    } 
} 

ответ

0

collect() немедленно запускает выполнение программы до набора данных, запрошенной collect(). Если позднее вы вызовете env.execute() или collect(), программа будет выполнена второй раз.

Помимо побочного эффекта исполнения, использование collect() для распределения значений для последующего преобразования также имеет недостаток, который передает данные клиенту, а затем обратно в кластер. Flink предлагает так называемые широковещательные переменные для отправки DataSet в качестве дополнительного входа в другое преобразование.

Использование Broadcast переменных в программе будет выглядеть следующим образом:

DataSet maxValues = result.max(2); 
result 
    .map(new NormAttrs()).withBroadcastSet(maxValues, "maxValues") 
    .writeAsCsv(...); 

NormAttrs функция будет выглядеть следующим образом:

private static class NormAttr extends RichMapFunction<Tuple6<...>, Tuple6<...>> { 

    private Tuple6<...> maxValues; 

    @Override 
    public void open(Configuration config) { 
    maxValues = (Tuple6<...>)getRuntimeContext().getBroadcastVariable("maxValues").get(1); 
    } 

    @Override 
    public PredictedLink map(Tuple6<...> value) throws Exception { 
    value.f2 /= maxValues.f2; 
    return value; 
    } 
} 

Вы можете найти более подробную информацию о переменных широковещательных в documentation.

+0

Большое спасибо! ;) – kaser

Смежные вопросы