2016-03-21 3 views
1

У меня проблема, когда мне нужно преобразовать два потока, читающих от искры до присоединения.Spark streaming - преобразуйте два потока и присоединитесь

После того, как я делаю преобразование, я больше не могу присоединиться, я не думаю, тип больше не DStream [(String, String)], но DStream [Карта [String, String]]

val windowStream1 = act1Stream.window(Seconds(5)).transform{rdd => rdd.map(_._2).map(l =>(...toMap)} 
val windowStream2 = act2Stream.window(Seconds(5)).transform{rdd => rdd.map(_._2).map(l =>(...toMap)} 

val joinedWindow = windowStream1.join(windowStream2) //can't join 

Любая идея ?

ответ

0

Это не решит вашу проблему, но делает ее более удобоваримой. Вы можете разделить цепочку методов и документ, типы которых вы ожидаете на каждом шаге, определяя временные идентификаторы val/def/var с ожидаемым типом. Таким образом, вы можете легко определить, где тип больше не соответствует вашим ожиданиям.

E.g. Я ожидаю, что ваши act1Stream и act2Stream экземпляров будут иметь тип DStream[(String, String)], на который я позвоню s1 и s2. Прокомментируйте, если это не так.

def joinedWindow(
     s1: DStream[(String, String)], 
     s2: DStream[(String, String)] 
    ): DStream[...] = { 
    val w1 = windowedStream(s1) 
    val w2 = windowedStream(s2) 
    w1.join(w2) 
} 
def windowedStream(actStream: DStream[(String, String)]): DStream[Map[...]] = { 
    val windowed: DStream[(String, String)] = actStream.window(Seconds(5)) 
    windowed.transform(myTransform) 
} 
def myTransform(rdd: RDD[(String, String)]): RDD[Map[...]] = { 
    val mapped: RDD[String] = rdd.map(_._2) 
    // not enough information to conclude 
    // the result type from given code 
    mapped.map(l =>(...toMap)) 
} 

Отсюда можно сделать вывод, остальные типы путем заполнения ... секций. Строка за строкой устраняет ошибки компилятора, пока вы не получите желаемые результаты. С документацией

DStream[T]

  • def window(windowDuration: Duration): DStream[T]
  • def transform[U](transformFunc: (RDD[T]) ⇒ RDD[U])(implicit arg0: ClassTag[U]): DStream[U]

PairDStreamFunctions[K,V]

  • def join[W](other: DStream[(K, W)])(implicit arg0: ClassTag[W]): DStream[(K, (V, W))]

RDD[T]

  • def map[U](f: (T) ⇒ U)(implicit arg0: ClassTag[U]): RDD[U]

По крайней мере, таким образом, вы дойдете до точки, где вы точно знаете, что ожидаемый тип и полученный тип не совпадают.

+0

Да, 'act1Stream' и' act2Stream' имеют тип 'DStream [Map [String, String]]', но позвольте мне объяснить здесь. У меня есть два разных потока типа 'DStream [(String, String)]', но перед присоединением мне нужно преобразовать каждый поток, а метод transform возвращает 'DStream [Map [String, String]]' и не может присоединиться к нему ... так что есть способ присоединиться к двум потокам 'DStream [Map [String, string]]' –

+0

Я не специалист по искры, но кажется, что операция соединения определена только на 'PairDStreamFunctions [K, V]' s , Подпись говорит мне, что вам нужно иметь два экземпляра типа 'DStream [(K, V)]', чтобы иметь возможность применить операцию соединения. Возможно, вы каким-то образом можете преобразовать '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '', чтобы возвращать 'DStream [(String, Map [String, String])]' экземпляры, если это имеет смысл в вашем случае использования как-то ... –

+0

right - will посмотрим, смогу ли это понять, спасибо! –

Смежные вопросы