У меня есть 2 rdds с различным набором разделителей.Apache Spark: присоединяйтесь к двум RDD с разными разделителями
case class Person(name: String, age: Int, school: String)
case class School(name: String, address: String)
rdd1
является РДД из Person
, который я распределяли на основе age
человека, а затем конвертируется ключ к school
.
val rdd1: RDD[Person] = rdd1.keyBy(person => (person.age, person))
.partitionBy(new HashPartitioner(10))
.mapPartitions(persons =>
persons.map{case(age,person) =>
(person.school, person)
})
rdd2
является РДД из School
сгруппированных по name
школы.
val rdd2: RDD[School] = rdd2.groupBy(_.name)
Теперь rdd1
разделяются в зависимости от возраста человека, так что все люди с тем же возрастом идет на тот же разделы. И, rdd2
разделен (по умолчанию) на основе имени школы.
Я хочу, чтобы rdd1.leftOuterJoin(rdd2)
таким образом, что rdd1
не перетасовывается, потому что rdd1 очень большой по сравнению с rdd2. Кроме того, я выводил результат в Cassandra, который разбит на age
, поэтому текущее разбиение на rdd1
закрепит процесс написания позже.
Есть ли способ, чтобы присоединиться там два РДУ без: 1. Перетасовки rdd1
и 2. Broadcasting «rdd2», потому что rdd2
больше, чем объем доступной памяти.
Примечание: Объединенный rdd должен быть разбит на разделы по возрасту.
Возможно использование подписи 'leftOuterJoin [W] (другое: RDD [(K, W)], разделитель: Partitioner)' и использование того же разделителя, что и rdd1, может помочь. –
Оба являются «HashPartitioner» на разных клавишах. Как указать пользовательский ключ в 'HashPartitioner'? Он принимает только количество разделов в качестве входных данных. – shashwat
Примечание: Размер rdd1 ~ 100GB и размер rdd2 ~ 10GB. У меня есть 15 таких rdd2, которые должны быть соединены с 'rdd1'. И каждый такой маленький rdd (rdd2 здесь) соединяется с разными ключами в rdd1. Чтобы избежать перетасовки rdd1, я разделил его на основе фиксированного ключа, чтобы он не перетасовался. – shashwat