2016-12-09 2 views
5

Мы пытаемся выделить одного и того же исполнителя и тот же разделитель для RDD, чтобы избежать сетевого трафика, а также операции тасования, такие как cogroup и join, не имеют границ сцены, и все преобразования завершаются на одном этапе ,Выделение одного и того же разделителя для RDD в Spark

Так для достижения этой цели мы обернуть RDD с нашей пользовательской RDD класса (ExtendRDD.class) в Java, который имеет функцию перекрываться getPreferredLocation от RDD.class (в Скале), как:

public Seq<String> getPreferredLocations(Partition split){ 
     listString.add("11.113.57.142"); 
     listString.add("11.113.57.163"); 
     listString.add("11.113.57.150"); 
     List<String> finalList = new ArrayList<String>(); 
     finalList.add(listString.get(split.index() % listString.size()));    

     Seq<String> toReturnListString = scala.collection.JavaConversions.asScalaBuffer(finalList).toSeq(); 

     return toReturnListString; 
    } 

С этим мы могут управлять поведением искры в отношении того, на каком узле он помещает RDD в кластер. Но теперь проблема заключается в том, что, поскольку разделитель для разных RDD отличается, искра считает их зависящими от перетасовки и снова создает несколько этапов для этих операций тасования. Мы пытались переопределить метод секционирования того же RDD.class в том же специальном РДЕ как:

public Option<Partitioner> partitioner() { 
     Option<Partitioner> optionPartitioner = new Some<Partitioner>(this.getPartitioner()); 
     return optionPartitioner; 
    } 

Для искры, чтобы поставить их под тем же стадией, что необходимо учитывать этот РД, приходят из тех же секционирования. Наш метод разделителя не работает, поскольку искра принимает разный разделитель для 2 RDD и создает несколько этапов для операций тасования.

Мы завернули RDD с Scala наших пользовательским РДОМ как:

ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class); 
RDD<String> distFile1 = jsc.textFile("SomePath/data.txt",1); 
ExtendRDD<String> extendRDD = new ExtendRDD<String>(distFile1, tag); 

Мы создаем еще один обычай RDD аналогичным образом и получить PairRDD (pairRDD2) из ​​этого РДА. Затем мы пытаемся применить те же разметки, как в объекте extendRDD к объекту PairRDDFunction с помощью функции partitionBy, а затем применить cogroup к этому:

RDD<Tuple2<String, String>> pairRDD = extendRDD.keyBy(new KeyByImpl()); 
PairRDDFunctions<String, String> pair = new PairRDDFunctions<String, String>(pairRDD, tag, tag, null); 
pair.partitionBy(extendRDD2.getPartitioner()); 
pair.cogroup(pairRDD2); 

Все это не похоже на работу, как искра создает несколько этапов, когда он сталкивается с преобразованием cogroup ,

Любые предложения о том, как мы можем применить один и тот же разделитель к RDD?

+0

Вы используете разделение хешей или разделение по разделам –

+0

HashPartitioning –

ответ

0

Я смог успешно сделать один этап для всех своих операций. enter image description here

Смежные вопросы