2016-09-29 2 views
0

У меня есть 5 перепутанных ключей-значений rdds, один большой (1,000,000 записей) и 4 относительных малых (100 000 записей). Все rdds были shullfed с тем же количеством разделов, у меня есть две стратегии, чтобы объединить 5,Искра: Как эффективно слить shuffled rdd?

  1. объединить 5 РД вместе
  2. объединить 4 маленького РД вместе, а затем присоединиться к BigOne

Я думаю, что стратегия 2 будет более эффективна, так как он бы не повторно тасует большой один. Но результат эксперимента показывает эффективность стратегии . Код и вывод следующий:

Код

import org.apache.log4j.{Level, Logger} 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkContext, SparkConf} 


object MergeStrategy extends App { 

    Logger.getLogger("org").setLevel(Level.ERROR) 
    Logger.getLogger("akka").setLevel(Level.ERROR) 

    val conf = new SparkConf().setMaster("local[4]").setAppName("test") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 

    val bigRddSize = 1e6.toInt 
    val smallRddSize = 1e5.toInt 
    println(bigRddSize) 

    val bigRdd = sc.parallelize((0 until bigRddSize) 
     .map(x => (scala.util.Random.nextInt, 0))).repartition(100).cache 
    bigRdd.take(10).foreach(println) 

    val smallRddList = (0 until 4).map(i => { 
     val rst = sc.parallelize((0 until smallRddSize) 
      .map(x => (scala.util.Random.nextInt, 0))).repartition(100).cache 
     println(rst.count) 
     rst 
    }).toArray 

    // strategy 1 
    { 
     val begin = System.currentTimeMillis 

     val s1Rst = sc.union(Array(bigRdd) ++ smallRddList).distinct(100) 
     println(s1Rst.count) 

     val end = System.currentTimeMillis 
     val timeCost = (end - begin)/1000d 
     println("S1 time count: %.1f s".format(timeCost)) 
    } 

    // strategy 2 
    { 
     val begin = System.currentTimeMillis 

     val smallMerged = sc.union(smallRddList).distinct(100).cache 
     println(smallMerged.count) 

     val s2Rst = bigRdd.fullOuterJoin(smallMerged).flatMap({ case (key, (left, right)) => { 
      if (left.isDefined && right.isDefined) Array((key, left.get), (key, right.get)).distinct 
      else if (left.isDefined) Array((key, left.get)) 
      else if (right.isDefined) Array((key, right.get)) 
      else throw new Exception("Cannot happen") 
     } 
     }) 
     println(s2Rst.count) 

     val end = System.currentTimeMillis 
     val timeCost = (end - begin)/1000d 
     println("S2 time count: %.1f s".format(timeCost)) 
    } 

} 

Выход

1000000 
(688282474,0) 
(-255073127,0) 
(872746474,0) 
(-792516900,0) 
(417252803,0) 
(-1514224305,0) 
(1586932811,0) 
(1400718248,0) 
(939155130,0) 
(1475156418,0) 
100000 
100000 
100000 
100000 
1399777 
S1 time count: 39.7 s 
399984 
1399894 
S2 time count: 49.8 s 

Мое понимание для перемешиваются РДУ было не так? Кто-нибудь может дать какие-то советы? Спасибо!

+0

Вы не присоединяетесь к стратегии 1 (только объединение), пока вы вступаете в стратегию 2. Почему? Помните, что объединение не нужно перетасовывать данные - оно может просто увеличивать RDD, которые присутствуют на каждом исполнителе. Более конкретно, Union создает только узкую зависимость, в то время как соединение создает зависание. Таким образом, кажется, что стратегия 1 и 2 - это яблоки и апельсины. –

+0

@SachinTyagi Моя цель состоит в том, чтобы отличить 5 rdds, стратегии 1 и 2 оба отличные наконец. различные будут перемешать данные. Поскольку большой rdd уже перетасован, стратегия 2 не будет перетасовывать большую, и должна быть более эффективной, но эксперимент показывает обратное. – bourneli

+0

Не совсем уверен, что я понимаю, но всякий раз, когда вы присоединяетесь, вы вводите зависимость в случайном порядке и, таким образом, получаете (пере) перетасовку данных. Независимо от того, был ли ваш rdd перетасован раньше или нет. И это согласуется с тем, что вы видите. –

ответ

0

я нашел способ, чтобы более эффективно объединить RDD см следующие 2 присоединяемых стратегии:

import org.apache.log4j.{Level, Logger} 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.{HashPartitioner, SparkContext, SparkConf} 
import scala.collection.mutable.ArrayBuffer 

object MergeStrategy extends App { 

    Logger.getLogger("org").setLevel(Level.ERROR) 
    Logger.getLogger("akka").setLevel(Level.ERROR) 

    val conf = new SparkConf().setMaster("local[4]").setAppName("test") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 

    val rddCount = 20 
    val mergeCount = 5 
    val dataSize = 20000 
    val parts = 50 

    // generate data 
    scala.util.Random.setSeed(943343) 
    val testData = for (i <- 0 until rddCount) 
     yield sc.parallelize(scala.util.Random.shuffle((0 until dataSize).toList).map(x => (x, 0))) 
      .partitionBy(new HashPartitioner(parts)) 
      .cache 
    testData.foreach(x => println(x.count)) 

    // strategy 1: merge directly 
    { 
     val buff = ArrayBuffer[RDD[(Int, Int)]]() 
     val begin = System.currentTimeMillis 
     for (i <- 0 until rddCount) { 
      buff += testData(i) 
      if ((buff.size >= mergeCount || i == rddCount - 1) && buff.size > 1) { 
       val merged = sc.union(buff).distinct 
        .partitionBy(new HashPartitioner(parts)).cache 
       println(merged.count) 

       buff.foreach(_.unpersist(false)) 
       buff.clear 
       buff += merged 
      } 
     } 
     val end = System.currentTimeMillis 
     val timeCost = (end - begin)/1000d 
     println("Strategy 1 Time Cost: %.1f".format(timeCost)) 
     assert(buff.size == 1) 

     println("Strategy 1 Complete, with merged Count %s".format(buff(0).count)) 
    } 


    // strategy 2: merge directly without repartition 
    { 
     val buff = ArrayBuffer[RDD[(Int, Int)]]() 
     val begin = System.currentTimeMillis 
     for (i <- 0 until rddCount) { 
      buff += testData(i) 
      if ((buff.size >= mergeCount || i == rddCount - 1) && buff.size > 1) { 
       val merged = sc.union(buff).distinct(parts).cache 
       println(merged.count) 

       buff.foreach(_.unpersist(false)) 
       buff.clear 
       buff += merged 
      } 
     } 
     val end = System.currentTimeMillis 
     val timeCost = (end - begin)/1000d 
     println("Strategy 2 Time Cost: %.1f".format(timeCost)) 
     assert(buff.size == 1) 

     println("Strategy 2 Complete, with merged Count %s".format(buff(0).count)) 
    } 

} 

Результат показывает, что стратегия 1 (время стоить 20,8 секунд) является более эффективной, чем стратегия 2 (время стоимость 34,3 секунд). мой компьютер - это окно 8, процессор 4 ядра 2.0 ГГц, 8 ГБ памяти.

Единственная разница в том, что стратегия разделена HashPartitioner, но стратегия 2 нет. В результате стратегия 1 производит ShuffledRDD, но стратегия 1 MapPartitionsRDD. Я думаю, RDD.distinct функции процессов ShuflledRDD более эффективно, чем MapPartitionsRDD.

Смежные вопросы