2015-08-18 3 views
0

У меня есть JavaPairDStream> String, Long> поток и JavaPairRDD> String, Long> пакетное. Теперь я хочу присоединиться к этим двум. В основном я хочу объединить данные в реальном времени с пакетными данными, используя Spark. Но прямое использование JavaPairDStream внутри аргумента для объединения дает ошибку. Какие изменения я должен внести, чтобы присоединиться к этим двум. Также это правильный путь или есть другой способ комбинировать просмотр партии и просмотр в реальном времени в искровом режиме.Регистрация JavaPairRDD и JavaPairDStream в Спарк

Например: stream.leftOuterJoin(batch); Показана ошибка. Я знаю, что это два разных типа данных, но поскольку JavaPairDStream является абстракцией для RDD реального времени, он должен работать.

Любые предложения относительно того, как я могу присоединиться к пакетному просмотру и представлению в реальном времени, было бы полезно. Благодарю.

И сожалеем о неправильном > в JavaPairDStream и JavaPairRDD. Я не смог найти правильный символ эвакуации, чтобы написать его правильно.

ответ

0

Ok я получил решение, как упомянуто здесь Transform Operation

Transform Операция

Преобразование операции (наряду с его вариациями, как transformWith) позволяет произвольные функции РДД-к-РДУ, которые должны применяться на DStream. Его можно использовать для применения любой операции RDD, которая не отображается в API DStream. Например, функциональность объединения каждой партии в потоке данных с другим набором данных напрямую не отображается в API DStream. Однако вы можете легко использовать преобразование для этого. Это дает очень мощные возможности. Например, если вы хотите очистить данные в режиме реального времени, объединив поток входных данных с предварительно вычисленной информацией о спаме (возможно, сгенерированный также с помощью Spark), а затем фильтрацию на основе этого.

import org.apache.spark.streaming.api.java.*; 
// RDD containing spam information 
final JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...); 
JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() { 
      @Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception { 

     rdd.join(spamInfoRDD).filter(...); 
     // join data stream with spam information to do data cleaning 
    ... 
    } 
}); 
Смежные вопросы