2016-04-26 3 views
1

Я пытаюсь соединить два потока с помощью Apache Flink потоковый API, но ничего не присоединился, и я понятия не имею, после чтения документации, что я сделал неправильноРегистрация двух потоков не работает

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
    DataStream<MyPojo2> source = env.fromCollection(Lists.newArrayList(new MyPojo2(1, "Ola"), new MyPojo2(2, "Ola"))) 
      .assignTimestampsAndWatermarks(new IngestionTimeExtractor<MyPojo2>()); 
    DataStream<MyPojo2> source2 = env.fromCollection(Lists.newArrayList(new MyPojo2(1, "Ela"), new MyPojo2(2, "Ela"))) 
      .assignTimestampsAndWatermarks(new IngestionTimeExtractor<MyPojo2>()); 
    DataStream<Tuple2<String, String>> joined = source.join(source2).where(keySelector).equalTo(keySelector). 
      window(GlobalWindows.create()).apply(joinFunction); 
    joined.print(); 
    env.execute("Window"); 

функция Key просто myPojo.getFirst()

ответ

2

Окно GlobalWindows никогда не срабатывает, если вы не указали обычай Trigger. В вашем примере, если вы используете что-то вроде TumblingEventTimeWindows.of(Time.seconds(5)), вы должны увидеть результаты.

+0

Следовательно, есть ли способ связать два потока в своей полной истории с использованием потокового API? Я бы не хотел использовать пакетный API, потому что я хотел бы получить первый результат как можно скорее, но, возможно, есть некоторый параметр для настройки пакетного API. – Artur

+0

Объединение их в их полную историю невозможно, так как они (технически) бесконечные потоки. Вы можете указать триггер, который периодически срабатывает с заданным интервалом. Таким образом, вы присоединитесь к тому, что в настоящее время существует. Для этого вы, например, будете использовать 'ContinuousProcessingTimeTrigger.of (Time.minutes (40))'. Если вы хотите удалить содержимое при запуске, вы можете также обернуть его в «PurgingTrigger». – aljoscha

Смежные вопросы