2016-07-28 3 views
0

Я хочу использовать Apache Flink для следующего. У меня есть один основной поток, который должен быть обогащен данными другого потока. Этот основной поток имеет элементы с атрибутами «сайт» и «метка времени». Другой поток (назовем его countrystream) имеет атрибуты «сайт» и «страна». Countrystream должен отслеживать последнюю страну, используемую для сайта. Например, если ("klm.com", "netherlands") прибыл первым, а через некоторое время появился кортеж ("klm.com", "france"), то «klm.com» должен указывать на «france» (поскольку это был последний). Таким образом, он должен поддерживать состояние. Предположим, что кортеж («klm.com», 100) прибыл в основной поток. Теперь это должно быть обогащено до ("klm.com", 100, "france"). Если какой-либо сайт не найден в countrystream, он должен быть обогащен «?». Так, например, ("stackoverflow.com", 150, "?"). Как я могу архивировать это?Обогащение одного потока другим потоком

ответ

0

Я нашел решение (мне потребовалось некоторое время). Является ли это эффективным? Можно ли улучшить? Означает ли это, что у меня нет контрольных точек для моего итеративного потока?

val env = StreamExecutionEnvironment.getExecutionEnvironment 

val mainStream = env.fromElements("a", "a", "b", "a", "a", "b", "b", "a", "c", "b", "a", "c") 
val infoStream = env.fromElements((1, "a", "It is F"), (2, "b", "It is B"), (3, "c", "It is C"), (4, "a", "Whoops, it is A")) 
     .iterate(
      iteration => { 
       (iteration, iteration) 
      } 
     ) 

mainStream 
    .coGroup(infoStream) 
     .where[String]((x: String) => x) 
     .equalTo(_._2) 
     .window(TumblingProcessingTimeWindows.of(Time.seconds(1))) { 
      (first: Iterator[String], second: Iterator[(Int, String, String)], out: Collector[(String, String)]) => { 
       first.foreach((key: String) => { 
         val matchingRecords = second 
          .filter(_._2 == key) 
         if (matchingRecords.nonEmpty) { 
          val matchingRecord = matchingRecords.maxBy(_._1) 
          out.collect((matchingRecord._2, matchingRecord._3)) 
         } 
        } 
       ) 
      } 
     } 
    .print() 

env.execute("proof_of_concept") 
Смежные вопросы