Это не решит вашу проблему, но делает ее более удобоваримой. Вы можете разделить цепочку методов и документ, типы которых вы ожидаете на каждом шаге, определяя временные идентификаторы val/def/var с ожидаемым типом. Таким образом, вы можете легко определить, где тип больше не соответствует вашим ожиданиям.
E.g. Я ожидаю, что ваши act1Stream
и act2Stream
экземпляров будут иметь тип DStream[(String, String)]
, на который я позвоню s1
и s2
. Прокомментируйте, если это не так.
def joinedWindow(
s1: DStream[(String, String)],
s2: DStream[(String, String)]
): DStream[...] = {
val w1 = windowedStream(s1)
val w2 = windowedStream(s2)
w1.join(w2)
}
def windowedStream(actStream: DStream[(String, String)]): DStream[Map[...]] = {
val windowed: DStream[(String, String)] = actStream.window(Seconds(5))
windowed.transform(myTransform)
}
def myTransform(rdd: RDD[(String, String)]): RDD[Map[...]] = {
val mapped: RDD[String] = rdd.map(_._2)
// not enough information to conclude
// the result type from given code
mapped.map(l =>(...toMap))
}
Отсюда можно сделать вывод, остальные типы путем заполнения ...
секций. Строка за строкой устраняет ошибки компилятора, пока вы не получите желаемые результаты. С документацией
DStream[T]
def window(windowDuration: Duration): DStream[T]
def transform[U](transformFunc: (RDD[T]) ⇒ RDD[U])(implicit arg0: ClassTag[U]): DStream[U]
PairDStreamFunctions[K,V]
def join[W](other: DStream[(K, W)])(implicit arg0: ClassTag[W]): DStream[(K, (V, W))]
RDD[T]
def map[U](f: (T) ⇒ U)(implicit arg0: ClassTag[U]): RDD[U]
По крайней мере, таким образом, вы дойдете до точки, где вы точно знаете, что ожидаемый тип и полученный тип не совпадают.
Да, 'act1Stream' и' act2Stream' имеют тип 'DStream [Map [String, String]]', но позвольте мне объяснить здесь. У меня есть два разных потока типа 'DStream [(String, String)]', но перед присоединением мне нужно преобразовать каждый поток, а метод transform возвращает 'DStream [Map [String, String]]' и не может присоединиться к нему ... так что есть способ присоединиться к двум потокам 'DStream [Map [String, string]]' –
Я не специалист по искры, но кажется, что операция соединения определена только на 'PairDStreamFunctions [K, V]' s , Подпись говорит мне, что вам нужно иметь два экземпляра типа 'DStream [(K, V)]', чтобы иметь возможность применить операцию соединения. Возможно, вы каким-то образом можете преобразовать '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '', чтобы возвращать 'DStream [(String, Map [String, String])]' экземпляры, если это имеет смысл в вашем случае использования как-то ... –
right - will посмотрим, смогу ли это понять, спасибо! –