2016-05-05 3 views
1

У меня есть 2 rdds с различным набором разделителей.Apache Spark: присоединяйтесь к двум RDD с разными разделителями

case class Person(name: String, age: Int, school: String) 
case class School(name: String, address: String) 

rdd1 является РДД из Person, который я распределяли на основе age человека, а затем конвертируется ключ к school.

val rdd1: RDD[Person] = rdd1.keyBy(person => (person.age, person)) 
          .partitionBy(new HashPartitioner(10)) 
          .mapPartitions(persons => 
           persons.map{case(age,person) => 
            (person.school, person) 
          }) 

rdd2 является РДД из School сгруппированных по name школы.

val rdd2: RDD[School] = rdd2.groupBy(_.name) 

Теперь rdd1 разделяются в зависимости от возраста человека, так что все люди с тем же возрастом идет на тот же разделы. И, rdd2 разделен (по умолчанию) на основе имени школы.

Я хочу, чтобы rdd1.leftOuterJoin(rdd2) таким образом, что rdd1 не перетасовывается, потому что rdd1 очень большой по сравнению с rdd2. Кроме того, я выводил результат в Cassandra, который разбит на age, поэтому текущее разбиение на rdd1 закрепит процесс написания позже.

Есть ли способ, чтобы присоединиться там два РДУ без: 1. Перетасовки rdd1 и 2. Broadcasting «rdd2», потому что rdd2 больше, чем объем доступной памяти.

Примечание: Объединенный rdd должен быть разбит на разделы по возрасту.

+0

Возможно использование подписи 'leftOuterJoin [W] (другое: RDD [(K, W)], разделитель: Partitioner)' и использование того же разделителя, что и rdd1, может помочь. –

+0

Оба являются «HashPartitioner» на разных клавишах. Как указать пользовательский ключ в 'HashPartitioner'? Он принимает только количество разделов в качестве входных данных. – shashwat

+0

Примечание: Размер rdd1 ~ 100GB и размер rdd2 ~ 10GB. У меня есть 15 таких rdd2, которые должны быть соединены с 'rdd1'. И каждый такой маленький rdd (rdd2 здесь) соединяется с разными ключами в rdd1. Чтобы избежать перетасовки rdd1, я разделил его на основе фиксированного ключа, чтобы он не перетасовался. – shashwat

ответ

1

Предположим, у вас есть два rdds, rdd1 и rdd2 и вы хотите применить операцию соединения. и если rdds разделен (раздел установлен). то вызов rdd3 = rdd1.join (rdd2) сделает rdd3-раздел на rdd1. rdd3 всегда будет принимать хэш-раздел из rdd1 (первый родительский, тот, который был вызван соединением).

Смежные вопросы