2014-11-03 2 views
3

Я пытаюсь передать частичную функцию объединению всех RDD, захваченных в пакете DStream через скользящее окно. Допущу, я построить работу окна в течение 10 секунд на потоке дискретизовать в 1 секунду партий:Spark: Как передать PartialFunction в DStream?

val ssc = new StreamingContext(new SparkConf(), Seconds(1)) 
val stream = ssc.socketStream(...) 
val window = stream.window(Seconds(10)) 

Моих window будет иметь K много РДА. Я хочу использовать collect(f: PartialFunction[T, U]) на объединении всех K этих RDD. Я мог бы позвонить оператору объединения ++ с использованием foreachRDD, но я хочу вернуть RDD не Unit и избежать побочных эффектов.

Что я ищу является редуктор, как

def reduce(f: (RDD[T], RDD[T]) ⇒ RDD[T]): RDD[T] 

на DStream, что я могу использовать так:

window.reduce(_ ++ _).transform(_.collect(myPartialFunc)) 

Но это не доступно в Спарк Streaming API.

Есть ли у кого-нибудь хорошие идеи для объединения RDD, захваченных в потоке, в один RDD, чтобы я мог выполнять частичную функцию? Или для реализации моего собственного редуктора RDD? Возможно, эта функция появляется в следующем выпуске Spark?

+0

Функция вычисления позволит вы должны получить RDD в течение определенного периода времени. – Anant

+0

@Anant Где начинается и заканчивается период? Метод 'compute' DStream принимает только параметр validTime. Это начало или конец моего окна? Кроме того, как я буду иметь дело с необходимостью многократно вызывать 'compute' в тот же интервал, что и мои партии? Я ищу что-то менее состоятельное. – nmurthy

+0

@nmurthy Вы не можете «собирать» на DStream. Не могли бы вы объяснить, что вы пытаетесь сделать? Возможно, есть еще один способ. – maasg

ответ

2

Частичные функции напрямую не поддерживаются операцией DStream, но нетрудно достичь такой же функциональности.

Например, давайте возьмем тривиальное частичную функцию, которая принимает строку производит Int строки, если это номер:

val pf:PartialFunction[String,Int] = {case x if (Try(x.toInt).isSuccess) => x.toInt} 

И у нас есть dstream цепочек:

val stringDStream:DStream[String] = ??? // use your stream source here 

Затем мы можем применить частичную функцию к DStream следующим образом:

val intDStream = stringDStream.filter(x => pf.isDefinedAt(x)).map(pf) 
Смежные вопросы