2015-02-05 2 views
3

Тот же вопрос также относится к разделению RDD на несколько новых RDD.Искры расщепления DStream на несколько RDD

DStream или RDD содержит несколько различных классов case, и мне нужно превратить их в отдельные RDD на основе типа класса case.

Я знаю

val newRDD = rdd.filter { a => a.getClass.getSimpleName == "CaseClass1" } 

или

val newRDD = rdd.filter { 
    a => a match { 
    case _: CC1 => true 
    case _ => false 
    } 
} 

Но это требует много прогонов через оригинальный РД, по одному на случай типа класса.

  1. Должен быть более сжатый способ сделать указанный фильтр соответствия?
  2. Есть ли способ разделить rdd на несколько по типу элемента с одним параллельным проходом?

ответ

1

Похожее с rdd.filter Я был на правильном пути с длинной формой. Немного более лаконичная версия:

val newRDD = rdd.filter { case _: CC1 => true ; case _ => false } 

Вы не можете оставить из case _ => false или тест для класса не является исчерпывающим, и вы получите ошибки. Я не мог заставить сбор правильно работать.

@maasg получает кредит за правильный ответ о выполнении отдельных проходов фильтра, а не взламывает способ разделить вход за один проход.

4

1) Более краткий способ фильтрации для данного типа заключается в использовании rdd.collect(PartialFunction[T,U])

эквивалентом

val newRDD = rdd.filter { a => a.getClass.getSimpleName == "CaseClass1" } 

будет:

val newRDD = rdd.collect{case c:CaseClass1 => c} 

Это может быть даже в сочетании с дополнительной фильтрацией и трансформацией:

val budgetRDD = rdd.collect{case c:CaseClass1 if (c.customer == "important") => c.getBudget} 

rdd.collect(p:PartialFunction[T,U])не следует путать с rdd.collect() который доставляет данные обратно к водителю.


2) Для того, чтобы разбить RDD (или DStream по этому вопросу), filter это путь. Нужно помнить, что RDD - это распределенная коллекция. Фильтр позволит вам применить функцию к подмножеству этой распределенной коллекции параллельно по кластеру.

Структурное создание 2 или более RDD от оригинального RDD может привести к тасованию 1-к-многим, что будет значительно дороже.

+0

Хорошо, но я действительно не хочу собирать? Я отредактировал выше, чтобы присвоить возвращаемый фильтр rdd. Другими словами, мне нужен фильтр, верно? – pferrel

+0

Я думаю, это потому, что вернуть значение из закрытия проще, чем логическое? – pferrel

+0

отредактировал ответ, чтобы показать, что вызов возвращает RDD. Как я уже говорил, rdd.collect() и rdd.collect {case x => x.y} имеют две совершенно разные цели. – maasg

Смежные вопросы