2016-08-23 2 views
0

У нас возникла проблема в создании asList() метода сортировки.Как создать сортировку вида asList() в SDK Google Dataflow?

Мы думали, что можем сделать это, просто расширив класс View и переопределив метод asList, но поняли, что класс View имеет частный конструктор, поэтому мы не могли этого сделать.

другой Наша попытка была раскошелиться код Google DataFlow на GitHub и изменить PCollectionViews класс вернуть отсортированный список будет с помощью метода Collections.sort, как показано на фрагменте кода ниже

@Override 
protected List<T> fromElements(Iterable<WindowedValue<T>> contents) { 
    Iterable<T> itr = Iterables.transform(
     contents, 
     new Function<WindowedValue<T>, T>() { 
      @SuppressWarnings("unchecked") 
      @Override 
      public T apply(WindowedValue<T> input){ 
      return input.getValue(); 
      } 
     }); 

    LOG.info("#### About to start sorting the list !"); 
    List<T> tempList = new ArrayList<T>(); 
    for (T element : itr) { 
     tempList.add(element); 
    }; 
    Collections.sort((List<? extends Comparable>) tempList); 
    LOG.info("##### List should now be sorted !"); 
    return ImmutableList.copyOf(tempList); 
} 

Обратите внимание, что мы теперь сортируют список.

Это казалось сработавшим при запуске с DirectPipelineRunner, но когда мы попробовали BlockingDataflowPipelineRunner, похоже, что смена кода выполнялась.

Примечание: Мы фактически перекомпилировали поток данных, используемый в нашем проекте, но это не сработало.

Как мы можем достичь этого (как отсортированный список из вызова метода asList)?

ответ

2

Классы в PCollectionViews не предназначены для расширения. Поддерживаются только примитивные типы просмотра, представленные View.asSingleton, View.asSingletonView.asIterable, View.asMap и View.asMultimap.

Чтобы получить отсортированный список из PCollectionView, вам необходимо отсортировать его после того, как вы его прочитали. Следующий код демонстрирует шаблон.

// Assume you have some PCollection 
PCollection<MyComparable> myPC = ...; 

// Prepare it for side input as a list 
final PCollectionView<List<MyComparable> myView = myPC.apply(View.asList()); 

// Side input the list and sort it 
someOtherValue.apply(
    ParDo.withSideInputs(myView).of(
     new DoFn<A, B>() { 
      @Override 
      public void processElement(ProcessContext ctx) { 
      List<MyComparable> tempList = 
       Lists.newArrayList(ctx.sideInput(myView)); 
      Collections.sort(tempList); 
      // do whatever you want with sorted list 
      } 
     })); 

Конечно, вы не можете отсортировать его повторно, в зависимости от стоимости сортировки против стоимости материализовать его в качестве нового PCollection, так что вы можете вывести это значение и прочитать его как новый боковой вход без труда:

// Side input the list, sort it, and put it in a PCollection 
PCollection<List<MyComparable>> sortedSingleton = Create.<Void>of(null).apply(
    ParDo.withSideInputs(myView).of(
     new DoFn<Void, B>() { 
      @Override 
      public void processElement(ProcessContext ctx) { 
      List<MyComparable> tempList = 
       Lists.newArrayList(ctx.sideInput(myView)); 
      Collections.sort(tempList); 
      ctx.output(tempList); 
      } 
     })); 

// Prepare it for side input as a list 
final PCollectionView<List<MyComparable>> sortedView = 
    sortedSingleton.apply(View.asSingleton()); 

someOtherValue.apply(
    ParDo.withSideInputs(sortedView).of(
     new DoFn<A, B>() { 
      @Override 
      public void processElement(ProcessContext ctx) { 
      ... ctx.sideInput(sortedView) ... 
      // do whatever you want with sorted list 
      } 
     })); 

Вы также можете быть заинтересованы в неподдерживаемый модуль sorter CONTRIB для выполнения больших видов, используя как память и локальный диск.

0

Мы попытались сделать это так, как предложил Кен Ноулз. Для больших наборов данных существует проблема. Если tempList большой (так что сортировка занимает некоторое измеримое время, так как она пропорциональна O (n * log n)), и если в «PCO» «someOtherValue» есть миллионы элементов, тогда мы обязательно пересобираем один и тот же список миллионов раз. Мы должны иметь возможность сортировать ONCE и FIRST, прежде чем передавать список в DoFn someOtherValue.apply.

+0

Здравствуйте! Я пропустил ваш параллельный ответ. Я обновил свой ответ, чтобы решить ваши проблемы. Если у вас есть дополнительные вопросы, вы можете добавить их в качестве комментария к моему ответу, чтобы быть уверенным, что я получаю уведомление. –

Смежные вопросы