2016-10-31 2 views
1

Я застрял на определенном синтаксисе scala-spark, и я надеюсь, что вы можете вести меня в правильном направлении.Выбор определенных элементов RDD1

, если RDD1 является типом Array [((с плавающей точкой, с плавающей точкой, с плавающей точкой), Лонг)],

RDD1.collect = Array ((x1, y1, z1), 1), ((х2, у2, z2), 2), ((x3, y3, y3), 3), ...)

и RDD2 это индексы, типа, Array [Long],

RDD2.collect = Array (1 , 3, 5 ...)

Каков наилучший способ извлечения значений из RDD1, индексы которых встречаются в RDD2. то есть выход, массив ((x1, y1, z1), 1), ((x3, y3, y3), 3), (x5, y5, y5), 5) ...)

Оба RDD1 и RDD2 достаточно велики, поэтому я бы хотел избежать использования .collect. В противном случае проблема заключается в простом поиске пересекающихся элементов в 2 массивах/списках scala.

Большое вам спасибо за помощь!

ответ

1

На PairRDD есть функция join, которую вы хотите использовать.

// coming in, we have: 
// rdd1: RDD[((Float, Float, Float), Long)] 
// rdd2: RDD[Long] 

val joinReadyRDD1 = rdd1.map { case (values, key) => (key, values) } 
val joinReadyRDD2 = rdd1.map { key => (key,()) } 
val joined = joinReadyRDD1.join(joinReadyRDD2).mapValues(_._1) 

Это возвращает RDD[(Long, (Float, Float, Float))], где появились Long ключи в rdd2.

Примечание: если у вас есть концептуальные «ключ» и «значение», сначала поставьте ключ. Взгляните на PairRDDFunctions, я связал выше - это довольно богатый API, и все это использует RDD[(Key, Value)].

Смежные вопросы