2016-11-21 3 views
1

Я довольно новичок в Spark Streaming, и я застреваю, пытаясь понять, как справиться с этой проблемой, так как я нашел много примеров для одиночных (K, V) пар, но что-то еще. Я был бы признателен за некоторую помощь, чтобы найти лучший подход, используя преобразования Spark с Java.Spark streaming уменьшить на несколько ключевых Java

Позвольте мне вкратце описать сценарий,

Цель состоит в том, чтобы получить коэффициент ошибок набора элементов в пределах временного окна.

Учитывая следующие входные,

(A, Error) 
(B, Success) 
(B, Error) 
(B, Success) 
(C, Success) 
(C, Error) 

Это будет агрегировать элементом, а затем статус (Element, (Number of Success, Number of Error)). В этом случае результат преобразования будет,

(A, (0,1)) 
(B, (2,1)) 
(C, (1,1)) 

И, наконец, вычисление соотношения с использованием функции, такие как (I1, I2) -> i1/(I1 + I2).

(A, 100%) 
(B, 33.3%) 
(C, 50%) 

Насколько я понимаю, результат будет дано reduceByKeyAndWindow() функции, например,

JavaPairDStream<String, Double> res = 
pairs.reduceByKeyAndWindow(reduceFunc, Durations.seconds(30), Durations.seconds(1)); 

После обратного потока приложения, мои вопросы,

Как определить пару на JavaPairDStream с более чем одним значением или ключом (может быть, что-то вроде JavaPairDStream<String, Tuple2<Integer,Integer>>)?

Какой наилучший способ для reduceFunc получить пару с несколькими ключами?

Каков наилучший способ сопоставления исходного DStream (может быть, что-то вроде JavaDStream<Tuple2<String, String>> line = input.map(func))?

Заранее благодарю вас за помощь.

ответ

2

Я уже нашел решение. Работая с классами функций и кортежами, можно найти любую комбинацию, которую вы бы создали с помощью Scala. Проблема в том, что я не нашел никакой документации или примеров, связанных с этим в Java. Ниже вы найдете мое решение, если оно может помочь кому угодно в будущем.

JavaPairDStream<String,String> samples = lines.flatMapToPair(new PairFlatMapFunction<String,String, String>() { 
      public Iterator<Tuple2<String,String>> call(String s) throws Exception { 
       return Arrays.asList(new Tuple2<String, String>(//Some logic on my data//).iterator(); 
      } 
     }); 


JavaPairDStream<Tuple2<String,String>, Integer> samplePairs = samples.mapToPair(
       new PairFunction<Tuple2<String,String>, Tuple2<String,String>, Integer>() { 
        public Tuple2<Tuple2<String,String>, Integer> call(Tuple2<String,String> t) { 
         return new Tuple2<Tuple2<String,String>, Integer>(t, 1); 
        } 
       }); 

     JavaPairDStream<String, Integer> countErrors = samplePairs.filter(new Function<Tuple2<Tuple2<String,String>,Integer>,Boolean>() { 
      public Boolean call(Tuple2<Tuple2<String,String>, Integer> t) 
      { 
       return (t._1._2.equals("Error")); 
      }}).mapToPair(new PairFunction<Tuple2<Tuple2<String,String>,Integer>, String, Integer>() { 
      public Tuple2<String,Integer> call(Tuple2<Tuple2<String,String>,Integer> t) { 
       return new Tuple2(t._1._1,t._2); 
      } 
     }).reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() { 
      public Integer call(Integer i1, Integer i2) { 
       return i1 + i2; 
      }}, Durations.seconds(30), Durations.seconds(1)); 

     JavaPairDStream<String, Integer> countSuccess= samplePairs.filter(new Function<Tuple2<Tuple2<String,String>,Integer>,Boolean>() { 
      public Boolean call(Tuple2<Tuple2<String,String>, Integer> t) 
      { 
       return (t._1._2.equals("Success")); 
      }}).mapToPair(new PairFunction<Tuple2<Tuple2<String,String>,Integer>, String, Integer>() { 
      public Tuple2<String,Integer> call(Tuple2<Tuple2<String,String>,Integer> t) { 
       return new Tuple2(t._1._1,t._2); 
      } 
     }).reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() { 
      public Integer call(Integer i1, Integer i2) { 
       return i1 + i2; 
      }}, Durations.seconds(30), Durations.seconds(1)); 

     JavaPairDStream<String,Tuple2<Optional<Integer>,Optional<Integer>>> countPairs = countSuccess.fullOuterJoin(countErrors); 

     JavaPairDStream<String, Double> mappedRDD = countPairs 
       .mapToPair(new PairFunction<Tuple2<String, Tuple2<Optional<Integer>, Optional<Integer>>>, String, Double>() { 
        public Tuple2<String, Double> call(Tuple2<String, Tuple2<Optional<Integer>, Optional<Integer>>> stringTuple2Tuple2) throws Exception { 
         if ((stringTuple2Tuple2._2()._2().isPresent()) && (stringTuple2Tuple2._2()._1().isPresent())) { 
          return new Tuple2<String, Double>(stringTuple2Tuple2._1(), ((double)stringTuple2Tuple2._2()._1().get()/
            ((double)stringTuple2Tuple2._2()._2().get()+(double)stringTuple2Tuple2._2()._1().get()))); 
         } else if (stringTuple2Tuple2._2()._2().isPresent()){ 
          return new Tuple2<String, Double>(stringTuple2Tuple2._1(), 1.0); 
         } else { 
          return new Tuple2<String, Double>(stringTuple2Tuple2._1(), 0.0); 
         } 
        } 
       });