2015-11-21 2 views
3

Например, есть два потока. Один из них - это объявления, которые показываются пользователям. Кортеж, в котором можно охарактеризовать (рекламировать, показывать временную метку). Другой - поток кликов - (рекламная, отмеченная меткой времени). Мы хотим получить объединенный поток, который включает в себя всю рекламу, которую пользователь щелкает через 20 минут после показа. Мое решение состоит в объединении этих двух потоков на SlidingTimeWindow. Но в объединенном потоке много повторяющихся кортежей. Как я могу получить присоединенный кортеж только один в новом потоке?Как избежать повторных кортежей в слайд-слайде Flink?

stream1.join(stream2) 
     .where(0) 
     .equalTo(0) 
     .window(SlidingTimeWindows.of(Time.of(30, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS))) 

ответ

4

Решение 1: поддержка Flink

Пусть соединить два потока на отдельные окна, как искры потокового видео. В этом случае реализуйте SlidingTimeWindows (21 мин., 1 мин.) В потоке рекламы и TupblingTimeWindows (1 мин.) В потоке кликов, затем присоединяйтесь к этим двум оконным потокам.

TupblingTimeWindows может избежать дублирования записей в объединенном потоке. 21 мин. Размер SlidingTimeWindows может избежать отсутствующих легальных кликов. Одной из проблем является некорректный щелчок (щелчок через 20 минут) в объединенном потоке. Эта проблема может быть легко устранена путем добавления фильтра.

MultiWindowsJoinedStreams<Tuple2<String, Long>, Tuple2<String, Long>> joinedStreams = 
      new MultiWindowsJoinedStreams<>(advertisement, click); 

    DataStream<Tuple3<String, Long, Long>> joinedStream = joinedStreams.where(keySelector) 
      .window(SlidingTimeWindows.of(Time.of(21, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))) 
      .equalTo(keySelector) 
      .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) 
      .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>() { 
       private static final long serialVersionUID = -3625150954096822268L; 

       @Override 
       public Tuple3<String, Long, Long> join(Tuple2<String, Long> first, Tuple2<String, Long> second) throws Exception { 
        return new Tuple3<>(first.f0, first.f1, second.f1); 
       } 
      }); 

    joinedStream = joinedStream.filter(new FilterFunction<Tuple3<String, Long, Long>>() { 
     private static final long serialVersionUID = -4325256210808325338L; 

     @Override 
     public boolean filter(Tuple3<String, Long, Long> value) throws Exception { 
      return value.f1<value.f2&&value.f1+20000>=value.f2; 
     } 
    }); 

Решение 2:

Flink поддерживает операции соединения без окна. Оператор объединения реализует интерфейс. TwoInputStreamOperator поддерживает два буфера (основанные на длительности) этих двух потоков и выводит один объединенный поток.

DataStream<Tuple2<String, Long>> advertisement = env 
      .addSource(new FlinkKafkaConsumer082<String>("advertisement", new SimpleStringSchema(), properties)) 
      .map(new MapFunction<String, Tuple2<String, Long>>() { 
       private static final long serialVersionUID = -6564495005753073342L; 

       @Override 
       public Tuple2<String, Long> map(String value) throws Exception { 
        String[] splits = value.split(" "); 
        return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1])); 
       } 
      }).keyBy(keySelector).assignTimestamps(timestampExtractor1); 

    DataStream<Tuple2<String, Long>> click = env 
      .addSource(new FlinkKafkaConsumer082<String>("click", new SimpleStringSchema(), properties)) 
      .map(new MapFunction<String, Tuple2<String, Long>>() { 
       private static final long serialVersionUID = -6564495005753073342L; 

       @Override 
       public Tuple2<String, Long> map(String value) throws Exception { 
        String[] splits = value.split(" "); 
        return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1])); 
       } 
      }).keyBy(keySelector).assignTimestamps(timestampExtractor2); 

    NoWindowJoinedStreams<Tuple2<String, Long>, Tuple2<String, Long>> joinedStreams = 
      new NoWindowJoinedStreams<>(advertisement, click); 
    DataStream<Tuple3<String, Long, Long>> joinedStream = joinedStreams 
      .where(keySelector) 
      .buffer(Time.of(20, TimeUnit.SECONDS)) 
      .equalTo(keySelector) 
      .buffer(Time.of(5, TimeUnit.SECONDS)) 
      .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>() { 
       private static final long serialVersionUID = -5075871109025215769L; 

       @Override 
       public Tuple3<String, Long, Long> join(Tuple2<String, Long> first, Tuple2<String, Long> second) throws Exception { 
        return new Tuple3<>(first.f0, first.f1, second.f1); 
       } 
      }); 

Я реализовал две новые базы операторов объединения в потоковом интерфейсе Flink API TwoInputTransformation. Пожалуйста, проверьте Flink-stream-join. Я добавлю больше тестов в этот репозиторий.

+0

Прохладный раствор! Спасибо, что поделились этим! –

+0

Довольно круто. Однако ссылки только на ответы считаются плохой практикой в ​​SO. Можете ли вы продлить свой ответ? –

+0

Спасибо, я продлю его и сделаю больше тестов на своих решениях. –

1

На вашем коде вы определили перекрывающееся скользящее окно (слайд меньше размера окна). Если вы не хотите иметь дубликаты, вы можете определить неперекрывающееся окно, указав только размер окна (по умолчанию слайд равен размеру окна).

+0

В этом случае есть одна проблема с неперекрывающимся окном. Если в конце первого окна есть рекламный кортеж, в то время как соответствующий кодовый ключ находится в начале второго окна. Затем в объединенном потоке будут отсутствовать некоторые данные. –

+0

Я вижу вашу мысль. Однако вы можете пропустить соответствующие кортежи, если вы используете слайд продолжительностью 10 секунд, если кортеж первых 10 секунд в окне будет соответствовать кортежу, это последние 10 секунд следующего окна. Чтобы не пропустить ни одного совпадающего кортежа, вам нужно построить 30-секундное окно со слайдом * одного кортежа *. Чтобы избежать дубликатов, вы также можете указать «Evictor»: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html –

+0

Спасибо за ваше предложение. Временное окно со слайдом одного кортежа имеет смысл для меня. Хотя кажется, что я не мог избежать дубликатов с помощью Evictor. –

Смежные вопросы