2015-04-09 4 views
13

мне нужно разбить RDD на 2 части:Apache фильтр Спарк РДД на две РДУ

1 часть, которая удовлетворяет условию; другая часть, которой нет. Я могу сделать filter дважды на оригинальном RDD, но это кажется неэффективным. Есть ли способ, который может сделать то, что мне нужно? Я не могу найти ничего в API и в литературе.

ответ

17

Спарк не поддерживает это, по умолчанию. Фильтрация на одних и тех же данных дважды не так уж плоха, если вы предварительно ее кешируете, а сама фильтрация выполняется быстро.

Если это действительно только два различных типа, вы можете использовать вспомогательный метод:

implicit class RDDOps[T](rdd: RDD[T]) { 
    def partitionBy(f: T => Boolean): (RDD[T], RDD[T]) = { 
    val passes = rdd.filter(f) 
    val fails = rdd.filter(e => !f(e)) // Spark doesn't have filterNot 
    (passes, fails) 
    } 
} 

val (matches, matchesNot) = sc.parallelize(1 to 100).cache().partitionBy(_ % 2 == 0) 

Но как только у вас есть несколько типов данных, просто назначить отфильтрованный на новый вал.

+0

работает ли этот подход с API-интерфейсом Spark Java? –

+0

Нет, Java не имеет методов расширения. –

+1

Не следует использовать 'rdd.cache()' перед запуском фильтров? Это, безусловно, должно увеличить скорость вашего второго фильтра. –

0

Если вы в порядке с T вместо RDD[T], то вы можете do this. В противном случае, вы могли бы, возможно, сделать что-то вроде этого:

val data = sc.parallelize(1 to 100) 
val splitData = data.mapPartitions{iter => { 
    val splitList = (iter.toList).partition(_%2 == 0) 
    Tuple1(splitList).productIterator 
    } 
}.map(_.asInstanceOf[Tuple2[List[Int],List[Int]]]) 

А, то вам, вероятно, нужно уменьшить это вниз, чтобы объединить списки, когда вы идете, чтобы выполнить действие

+0

Я хотел бы узнать, почему это было вниз проголосовали, так как это единственный ответ, который на самом деле отвечает на OPs вопрос –

+0

(примечание: я не вниз голосование) ваш метод интересен но он не отвечает на вопрос. OP запросил «раздел (RDD [A], A => Boolean): (RDD [A], RDD [A])', ваш будет 'partition (RDD [A], A => Boolean): RDD [Список [A], Список [A]] ' –

3

Spark RDD не имеет такого api.

Вот версия, основанная на pull request for rdd.span, который должен работать:

import scala.reflect.ClassTag 
import org.apache.spark.rdd._ 

def split[T:ClassTag](rdd: RDD[T], p: T => Boolean): (RDD[T], RDD[T]) = { 

    val splits = rdd.mapPartitions { iter => 
     val (left, right) = iter.partition(p) 
     val iterSeq = Seq(left, right) 
     iterSeq.iterator 
    } 

    val left = splits.mapPartitions { iter => iter.next().toIterator} 

    val right = splits.mapPartitions { iter => 
     iter.next() 
     iter.next().toIterator 
    } 
    (left, right) 
} 

val rdd = sc.parallelize(0 to 10, 2) 

val (first, second) = split[Int](rdd, _ % 2 == 0) 

first.collect 
// Array[Int] = Array(0, 2, 4, 6, 8, 10) 
+0

Также проверьте это: https://issues.apache.org/jira/browse/SPARK-3533 –

+1

Я бы сделал ставку более сложным и менее эффективным, чем два фильтра –

+0

@JustinPihony да, фильтры намного эффективнее. –

3

Дело в том, вы не хотите, чтобы сделать фильтр, но карта.

(T) -> (Boolean, T) 

Извините, я неэффективен в синтаксисе Scala. Но идея состоит в том, что вы разделяете свой ответ, сопоставляя его с парами Key/Value. Ключ может быть логическим, указывающим на увядание или не передавая предикат «Фильтр».

+0

Мне это нравится с точки зрения простоты –

0

Вы можете использовать subtract function (Если операция фильтра слишком дорога).

код PySpark:

rdd1 = data.filter(filterFunction) 

rdd2 = data.subtract(rdd1) 
Смежные вопросы