2015-05-19 4 views
0

Я новичок в Spark и Scala, поэтому мой вопрос, вероятно, довольно прост, но я все еще пытаюсь найти ответ. Мне нужно объединить два потока Spark, но у меня проблемы с преобразованием этих потоков в соответствующий формат. Пожалуйста, смотрите мой код ниже:Преобразование потока входных данных в пары значений ключей

val lines7 = ssc.socketTextStream("localhost", 9997) 
val pairs7 = lines7.map(line => (line.split(" ")[0], line)) 

val lines8 = ssc.socketTextStream("localhost", 9998) 
val pairs8 = lines8.map(line => (line.split(" ")[0], line)) 

val newStream = pairs7.join(pairs8) 

Это не работает, потому что функция «присоединиться» ожидает потоки в формате DStream[String, String] и результат функции картографического DStream[(String, String)].

И теперь мой вопрос заключается в том, как закодировать эту функцию карты, чтобы получить соответствующий результат (небольшое объяснение было бы также замечательным)?

Заранее спасибо.

+0

Я не знаю Искра, но из того, что я вижу, [DStream] (https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming .dstream.DStream) принимает только один параметр типа, поэтому 'DStream [String, String]' невозможен. Вы, вероятно, хотите что-то еще, но я не могу догадаться. Либо это, либо 'DStream [(String, String)]' то, что вы хотите, но вы этого еще не знаете. –

+0

Вы импортировали неявные преобразования в 'StreamingContext._'? – maasg

ответ

1

Это работает, как ожидалось:

import org.apache.spark.streaming.{Seconds, StreamingContext} 

val ssc = new StreamingContext(sc, Seconds(30)) 
val lines7 = ssc.socketTextStream("localhost", 9997) 
val pairs7 = lines7.map(line => (line.split(" ")(0), line)) 
val lines8 = ssc.socketTextStream("localhost", 9998) 
val pairs8 = lines8.map(line => (line.split(" ")(0), line)) 
val newStream = pairs7.join(pairs8) 

newStream.foreachRDD(rdd => println(rdd.collect.map(_.toString).mkString(","))) 

ssc.start 

Единственная проблема, которую я вижу ошибку синтаксиса на: line.split(" ")[0] против line.split(" ")(0), но я думаю, что будет замечено компилятором.

Смежные вопросы