Решение 1: поддержка Flink
Пусть соединить два потока на отдельные окна, как искры потокового видео. В этом случае реализуйте SlidingTimeWindows (21 мин., 1 мин.) В потоке рекламы и TupblingTimeWindows (1 мин.) В потоке кликов, затем присоединяйтесь к этим двум оконным потокам.
TupblingTimeWindows может избежать дублирования записей в объединенном потоке. 21 мин. Размер SlidingTimeWindows может избежать отсутствующих легальных кликов. Одной из проблем является некорректный щелчок (щелчок через 20 минут) в объединенном потоке. Эта проблема может быть легко устранена путем добавления фильтра.
MultiWindowsJoinedStreams<Tuple2<String, Long>, Tuple2<String, Long>> joinedStreams =
new MultiWindowsJoinedStreams<>(advertisement, click);
DataStream<Tuple3<String, Long, Long>> joinedStream = joinedStreams.where(keySelector)
.window(SlidingTimeWindows.of(Time.of(21, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)))
.equalTo(keySelector)
.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
private static final long serialVersionUID = -3625150954096822268L;
@Override
public Tuple3<String, Long, Long> join(Tuple2<String, Long> first, Tuple2<String, Long> second) throws Exception {
return new Tuple3<>(first.f0, first.f1, second.f1);
}
});
joinedStream = joinedStream.filter(new FilterFunction<Tuple3<String, Long, Long>>() {
private static final long serialVersionUID = -4325256210808325338L;
@Override
public boolean filter(Tuple3<String, Long, Long> value) throws Exception {
return value.f1<value.f2&&value.f1+20000>=value.f2;
}
});
Решение 2:
Flink поддерживает операции соединения без окна. Оператор объединения реализует интерфейс. TwoInputStreamOperator поддерживает два буфера (основанные на длительности) этих двух потоков и выводит один объединенный поток.
DataStream<Tuple2<String, Long>> advertisement = env
.addSource(new FlinkKafkaConsumer082<String>("advertisement", new SimpleStringSchema(), properties))
.map(new MapFunction<String, Tuple2<String, Long>>() {
private static final long serialVersionUID = -6564495005753073342L;
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] splits = value.split(" ");
return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1]));
}
}).keyBy(keySelector).assignTimestamps(timestampExtractor1);
DataStream<Tuple2<String, Long>> click = env
.addSource(new FlinkKafkaConsumer082<String>("click", new SimpleStringSchema(), properties))
.map(new MapFunction<String, Tuple2<String, Long>>() {
private static final long serialVersionUID = -6564495005753073342L;
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] splits = value.split(" ");
return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1]));
}
}).keyBy(keySelector).assignTimestamps(timestampExtractor2);
NoWindowJoinedStreams<Tuple2<String, Long>, Tuple2<String, Long>> joinedStreams =
new NoWindowJoinedStreams<>(advertisement, click);
DataStream<Tuple3<String, Long, Long>> joinedStream = joinedStreams
.where(keySelector)
.buffer(Time.of(20, TimeUnit.SECONDS))
.equalTo(keySelector)
.buffer(Time.of(5, TimeUnit.SECONDS))
.apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
private static final long serialVersionUID = -5075871109025215769L;
@Override
public Tuple3<String, Long, Long> join(Tuple2<String, Long> first, Tuple2<String, Long> second) throws Exception {
return new Tuple3<>(first.f0, first.f1, second.f1);
}
});
Я реализовал две новые базы операторов объединения в потоковом интерфейсе Flink API TwoInputTransformation. Пожалуйста, проверьте Flink-stream-join. Я добавлю больше тестов в этот репозиторий.
Прохладный раствор! Спасибо, что поделились этим! –
Довольно круто. Однако ссылки только на ответы считаются плохой практикой в SO. Можете ли вы продлить свой ответ? –
Спасибо, я продлю его и сделаю больше тестов на своих решениях. –