2015-09-01 3 views

ответ

2

На данный момент нет способа получить индекс подзадачи, на котором работает оператор оконной обработки с запуском TriggerPolicy.

Тем не менее, вы можете обойти это, поставив на место вверху операцию map, которая присваивает каждому элементу данных текущий индекс подзадачи.

DataStream<Tuple2<Integer, String>> ds = env.fromElements(
     new Tuple2<Integer, String>(1, "a"), 
     new Tuple2<Integer, String>(2, "b"), 
     new Tuple2<Integer, String>(1, "c"), 
     new Tuple2<Integer, String>(2, "d")); 

ds.groupBy(0) 
    .map(new RichMapFunction<Tuple2<Integer,String>, Tuple3<Integer, Integer, String>>() { 
     @Override 
     public Tuple3<Integer, Integer, String> map(Tuple2<Integer, String> integerStringTuple2) throws Exception { 
      return new Tuple3<Integer, Integer, String>(
       getRuntimeContext().getIndexOfThisSubtask(), 
       integerStringTuple2.f0, 
       integerStringTuple2.f1); 
     } 
    }) 
    .window(new TestingTriggerPolicy(), new TestingEvictionPolicy()) 
    .mapWindow(new WindowMapFunction<Tuple3<Integer, Integer, String>, String>() { 
     @Override 
     public void mapWindow(Iterable<Tuple3<Integer, Integer, String>> iterable, Collector<String> collector) throws Exception { 
      StringBuilder builder = new StringBuilder(); 

      for (Tuple3<Integer, Integer, String> element : iterable) { 
       builder.append(element.toString() +"; "); 
      } 

      collector.collect(builder.toString()); 
     } 
    }) 
+0

Спасибо, я просто знаю, что локальное окно и его нисходящий оператор (например, mapWindow) обрабатываются последовательно в одном потоке. – zhangshengxiong

+0

@zhangshengxiong, обрабатываемый одним или несколькими потоками, здесь не проблема. Вы должны избегать перетасовки данных, которая разрушит разбиение ваших элементов и, таким образом, отображение ваших элементов в индекс подзадачи. Но пока вы используете локальные окна, это не должно быть проблемой. –

Смежные вопросы