2016-09-29 3 views
3

Я заметил, что java apache beam имеет класс groupby.sortbytimestamp, на котором у python реализована эта функция? Если бы не было способа сортировки элементов в окне? Я полагаю, я мог бы сортировать все окно в DoFn, но я хотел бы знать, есть ли лучший способ.Как я могу упорядочить элементы в окне в python apache beam?

+0

Где Вы находите этот класс? Я не думаю, что он существует больше: https://github.com/apache/beam/search?utf8=%E2%9C%93&q=sortbytimestamp&type= – skeller88

ответ

6

В настоящее время нет встроенной сортировки значений в Beam (на Python или Java). Прямо сейчас лучшим вариантом является сортировка значений самостоятельно в DoFn, как вы упомянули.

1

Вот решение, использующее CombineFn. Он имеет дополнительный бонус дедупликации данных с помощью TreeSet. Вы также должны убедиться, что ваши данные для окна достаточно малы, чтобы поместиться в памяти на одного работника.

public static class DedupAndSortByTime extends Combine.CombineFn<MarketData, TreeSet<MarketData>, List<MarketData>> { 
@Override 
public TreeSet<MarketData> createAccumulator() { 
    return new TreeSet<>(Comparator 
      .comparingLong(MarketData::getEventTime) 
      .thenComparing(MarketData::getOrderbookType)); 
} 

@Override 
public TreeSet<MarketData> addInput(TreeSet<MarketData> accum, MarketData input) { 
    accum.add(input); 
    return accum; 
} 

@Override 
public TreeSet<MarketData> mergeAccumulators(Iterable<TreeSet<MarketData>> accums) { 

    TreeSet<MarketData> merged = createAccumulator(); 
    for (TreeSet<MarketData> accum : accums) { 
     merged.addAll(accum); 
    } 
    return merged; 
} 

@Override 
public List<MarketData> extractOutput(TreeSet<MarketData> accum) { 
    return Lists.newArrayList(accum.iterator()); 
} 

}

Смежные вопросы