2016-07-12 3 views
1

У меня есть два потока кафки, которые содержат результаты для двух параллельных операций. Мне нужен способ объединить оба потока, чтобы я мог обрабатывать результаты в одном искрообразовании. Это возможно? (Рисунок ниже)Объединение двух искровых потоков по ключу

Stream 1 {id:1,result1:True} 
Stream 2 {id:1,result2:False} 
     JOIN(Stream 1, Stream 2, On "id") -> Output Stream {id:1,result1:True,result2:False} 

Текущий код, который не работает:

kvs1 = KafkaUtils.createStream(sparkstreamingcontext, ZOOKEEPER, NAME+"_stream", {"test_join_1": 1}) 
    kvs2 = KafkaUtils.createStream(sparkstreamingcontext, ZOOKEEPER, NAME+"_stream", {"test_join_2": 1}) 

    messages_RDDstream1 = kvs1.map(lambda x: x[1]) 
    messages_RDDstream2 = kvs2.map(lambda x: x[1]) 

    messages_RDDstream_Final = messages_RDDstream1.join(messages_RDDstream2) 

Когда я прохожу два пример jsons к каждой очереди Кафки с тем же ID поля, ничего не возвращается в моем последнем потоке RDD , Я изображаю, что мне не хватает стадии преобразования моего строкового сообщения Kafka JSON в Tuple?

Я также попытался следующие:

kvs1.map(lambda (key, value): json.loads(value)) 

и

kvs1.map(lambda x: json.loads(x)) 

Чтобы без толку

Приветствия

Адам

+0

Оба ваших РДУ состоять только из ключей, вы должны иметь PairRDD правильно использовать 'join' операцию со следующим RDD структурного типа:' (ключ, значение) ' –

ответ

0

Простой поиск по г Спарк в ocumentation дал бы вам ответ.

Вы можете использовать операцию join.

присоединиться к (otherStream, [numTasks]):

При вызове на двух DStreams из (K, V) и (K, W) пар, возвращают новый DStream из (K, (V , W)) пары со всеми парами элементов для каждого ключа.

Например: val streamJoined = stream1.join(stream2)

+0

См. Мой последний ответ –

+0

Пожалуйста, добавьте свои исследовательские усилия в исходный вопрос для лучшей видимости. –

+0

Обновлен мой тестовый код - я представляю себе карту(), что я делаю неправильно, поскольку мне нужно загружать JSON? –

1

Что вам нужно может быть сделано с помощью join() метод DStreams пары ключ-значение:

// Test data 
val input1 = List((1, true), (2, false), (3, false), (4, true), (5, false)) 
val input2 = List((1, false), (2, false), (3, true), (4, true), (5, true)) 

val input1RDD = sc.parallelize(input1) 
val input2RDD = sc.parallelize(input2) 

import org.apache.spark.streaming.{Seconds, StreamingContext} 
val streamingContext = new StreamingContext(sc, Seconds(3)) 
// Creates a DStream from the test data 
import scala.collection.mutable 
val input1DStream = streamingContext.queueStream[(Int, Boolean)](mutable.Queue(input1RDD)) 
val input2DStream = streamingContext.queueStream[(Int, Boolean)](mutable.Queue(input2RDD)) 
// Join the two streams together by merging them into a single dstream 
val joinedDStream = input1DStream.join(input2DStream) 
// Print the result 
joinedDStream.print() 
// Start the context, time out after one batch, and then stop it 
streamingContext.start() 
streamingContext.awaitTerminationOrTimeout(5000) 
streamingContext.stop() 

Результаты в:

-------------------------------------------          
Time: 1468313607000 ms 
------------------------------------------- 
(4,(true,true)) 
(2,(false,false)) 
(1,(true,false)) 
(3,(false,true)) 
(5,(false,true)) 
0

У меня есть соединил два queueStream, используя Spark java. Пожалуйста, смотрите ниже код.

import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.List; 
import java.util.Queue; 

import org.apache.commons.lang3.tuple.Pair; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.PairFunction; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.api.java.JavaInputDStream; 
import org.apache.spark.streaming.api.java.JavaPairDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 

import com.google.common.collect.Queues; 

import scala.Tuple2; 

public class SparkQueueStreamJoin { 

public static void main(String[] args) throws InterruptedException { 

    // Test data 
    List<Pair<Integer, Boolean>> input1 = Arrays.asList(Pair.of(1,true), Pair.of(2,false), Pair.of(3,false), Pair.of(4,true), Pair.of(5,false)); 
    List<Pair<Integer, Boolean>> input2 = Arrays.asList(Pair.of(1,false), Pair.of(2,false), Pair.of(3,true), Pair.of(4,true), Pair.of(5,true)); 

    SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SparkQueueStreamJoin ") 
      .set("spark.testing.memory", "2147480000"); 
    //System.setProperty("hadoop.home.dir", "C:/H`enter code here`adoop/hadoop-2.7.1"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 

    JavaRDD<Pair<Integer, Boolean>> input1RDD = sc.parallelize(input1); 
    JavaRDD<Pair<Integer, Boolean>> input2RDD = sc.parallelize(input2); 

    JavaStreamingContext streamingContext = new JavaStreamingContext(sc, Durations.seconds(3)); 

    Queue<JavaRDD<Pair<Integer, Boolean>>> queue1RDD = Queues.newLinkedBlockingQueue(); 
    queue1RDD.add(input1RDD); 
    Queue<JavaRDD<Pair<Integer, Boolean>>> queue2RDD = Queues.newLinkedBlockingQueue(); 
    queue2RDD.add(input2RDD); 

    // Creates a DStream from the test data 
    JavaInputDStream<Pair<Integer, Boolean>> input1DStream = streamingContext.queueStream(queue1RDD, false); 
    JavaInputDStream<Pair<Integer, Boolean>> input2DStream = streamingContext.queueStream(queue2RDD, false); 

    JavaPairDStream<Integer, Boolean> pair1DStream = input1DStream.mapToPair(new PairFunction<Pair<Integer, Boolean>, Integer, Boolean>() { 
     @Override 
     public Tuple2<Integer, Boolean> call(Pair<Integer, Boolean> rawEvent) throws Exception { 

      return new Tuple2<>(rawEvent.getKey(), rawEvent.getValue()); 
     } 
    }); 
    JavaPairDStream<Integer, Boolean> pair2DStream = input2DStream.mapToPair(new PairFunction<Pair<Integer, Boolean>, Integer, Boolean>() { 
     @Override 
     public Tuple2<Integer, Boolean> call(Pair<Integer, Boolean> rawEvent) throws Exception { 

      return new Tuple2<>(rawEvent.getKey(), rawEvent.getValue()); 
     } 
    }); 

    // Union two streams together by merging them into a single dstream 
    //JavaDStream<Pair<Integer, Boolean>> joinedDStream = input1DStream.union(input2DStream); 

    // Join the two streams together by merging them into a single dstream 
    JavaPairDStream<Integer, Tuple2<Boolean, Boolean>> joinedDStream = pair1DStream.join(pair2DStream); 
    // Print the result 
    joinedDStream.print(); 
    // Start the context, time out after one batch, and then stop it 
    streamingContext.start(); 
    streamingContext.awaitTerminationOrTimeout(5000); 
    streamingContext.stop(); 
} 
} 

Выход:

------------------------------------------- 
Time: 1511444352000 ms 
------------------------------------------- 
(1,(true,false)) 
(2,(false,false)) 
(3,(false,true)) 
(4,(true,true)) 
(5,(false,true)) 
Смежные вопросы