1
Я пытаюсь соединить два потока с помощью Apache Flink потоковый API, но ничего не присоединился, и я понятия не имею, после чтения документации, что я сделал неправильноРегистрация двух потоков не работает
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyPojo2> source = env.fromCollection(Lists.newArrayList(new MyPojo2(1, "Ola"), new MyPojo2(2, "Ola")))
.assignTimestampsAndWatermarks(new IngestionTimeExtractor<MyPojo2>());
DataStream<MyPojo2> source2 = env.fromCollection(Lists.newArrayList(new MyPojo2(1, "Ela"), new MyPojo2(2, "Ela")))
.assignTimestampsAndWatermarks(new IngestionTimeExtractor<MyPojo2>());
DataStream<Tuple2<String, String>> joined = source.join(source2).where(keySelector).equalTo(keySelector).
window(GlobalWindows.create()).apply(joinFunction);
joined.print();
env.execute("Window");
функция Key просто myPojo.getFirst()
Следовательно, есть ли способ связать два потока в своей полной истории с использованием потокового API? Я бы не хотел использовать пакетный API, потому что я хотел бы получить первый результат как можно скорее, но, возможно, есть некоторый параметр для настройки пакетного API. – Artur
Объединение их в их полную историю невозможно, так как они (технически) бесконечные потоки. Вы можете указать триггер, который периодически срабатывает с заданным интервалом. Таким образом, вы присоединитесь к тому, что в настоящее время существует. Для этого вы, например, будете использовать 'ContinuousProcessingTimeTrigger.of (Time.minutes (40))'. Если вы хотите удалить содержимое при запуске, вы можете также обернуть его в «PurgingTrigger». – aljoscha