У меня есть 5 перепутанных ключей-значений rdds, один большой (1,000,000 записей) и 4 относительных малых (100 000 записей). Все rdds были shullfed с тем же количеством разделов, у меня есть две стратегии, чтобы объединить 5,Искра: Как эффективно слить shuffled rdd?
- объединить 5 РД вместе
- объединить 4 маленького РД вместе, а затем присоединиться к BigOne
Я думаю, что стратегия 2 будет более эффективна, так как он бы не повторно тасует большой один. Но результат эксперимента показывает эффективность стратегии . Код и вывод следующий:
Код
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
object MergeStrategy extends App {
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val conf = new SparkConf().setMaster("local[4]").setAppName("test")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val bigRddSize = 1e6.toInt
val smallRddSize = 1e5.toInt
println(bigRddSize)
val bigRdd = sc.parallelize((0 until bigRddSize)
.map(x => (scala.util.Random.nextInt, 0))).repartition(100).cache
bigRdd.take(10).foreach(println)
val smallRddList = (0 until 4).map(i => {
val rst = sc.parallelize((0 until smallRddSize)
.map(x => (scala.util.Random.nextInt, 0))).repartition(100).cache
println(rst.count)
rst
}).toArray
// strategy 1
{
val begin = System.currentTimeMillis
val s1Rst = sc.union(Array(bigRdd) ++ smallRddList).distinct(100)
println(s1Rst.count)
val end = System.currentTimeMillis
val timeCost = (end - begin)/1000d
println("S1 time count: %.1f s".format(timeCost))
}
// strategy 2
{
val begin = System.currentTimeMillis
val smallMerged = sc.union(smallRddList).distinct(100).cache
println(smallMerged.count)
val s2Rst = bigRdd.fullOuterJoin(smallMerged).flatMap({ case (key, (left, right)) => {
if (left.isDefined && right.isDefined) Array((key, left.get), (key, right.get)).distinct
else if (left.isDefined) Array((key, left.get))
else if (right.isDefined) Array((key, right.get))
else throw new Exception("Cannot happen")
}
})
println(s2Rst.count)
val end = System.currentTimeMillis
val timeCost = (end - begin)/1000d
println("S2 time count: %.1f s".format(timeCost))
}
}
Выход
1000000
(688282474,0)
(-255073127,0)
(872746474,0)
(-792516900,0)
(417252803,0)
(-1514224305,0)
(1586932811,0)
(1400718248,0)
(939155130,0)
(1475156418,0)
100000
100000
100000
100000
1399777
S1 time count: 39.7 s
399984
1399894
S2 time count: 49.8 s
Мое понимание для перемешиваются РДУ было не так? Кто-нибудь может дать какие-то советы? Спасибо!
Вы не присоединяетесь к стратегии 1 (только объединение), пока вы вступаете в стратегию 2. Почему? Помните, что объединение не нужно перетасовывать данные - оно может просто увеличивать RDD, которые присутствуют на каждом исполнителе. Более конкретно, Union создает только узкую зависимость, в то время как соединение создает зависание. Таким образом, кажется, что стратегия 1 и 2 - это яблоки и апельсины. –
@SachinTyagi Моя цель состоит в том, чтобы отличить 5 rdds, стратегии 1 и 2 оба отличные наконец. различные будут перемешать данные. Поскольку большой rdd уже перетасован, стратегия 2 не будет перетасовывать большую, и должна быть более эффективной, но эксперимент показывает обратное. – bourneli
Не совсем уверен, что я понимаю, но всякий раз, когда вы присоединяетесь, вы вводите зависимость в случайном порядке и, таким образом, получаете (пере) перетасовку данных. Независимо от того, был ли ваш rdd перетасован раньше или нет. И это согласуется с тем, что вы видите. –