Мы пытаемся выделить одного и того же исполнителя и тот же разделитель для RDD, чтобы избежать сетевого трафика, а также операции тасования, такие как cogroup и join, не имеют границ сцены, и все преобразования завершаются на одном этапе ,Выделение одного и того же разделителя для RDD в Spark
Так для достижения этой цели мы обернуть RDD с нашей пользовательской RDD класса (ExtendRDD.class) в Java, который имеет функцию перекрываться getPreferredLocation от RDD.class (в Скале), как:
public Seq<String> getPreferredLocations(Partition split){
listString.add("11.113.57.142");
listString.add("11.113.57.163");
listString.add("11.113.57.150");
List<String> finalList = new ArrayList<String>();
finalList.add(listString.get(split.index() % listString.size()));
Seq<String> toReturnListString = scala.collection.JavaConversions.asScalaBuffer(finalList).toSeq();
return toReturnListString;
}
С этим мы могут управлять поведением искры в отношении того, на каком узле он помещает RDD в кластер. Но теперь проблема заключается в том, что, поскольку разделитель для разных RDD отличается, искра считает их зависящими от перетасовки и снова создает несколько этапов для этих операций тасования. Мы пытались переопределить метод секционирования того же RDD.class в том же специальном РДЕ как:
public Option<Partitioner> partitioner() {
Option<Partitioner> optionPartitioner = new Some<Partitioner>(this.getPartitioner());
return optionPartitioner;
}
Для искры, чтобы поставить их под тем же стадией, что необходимо учитывать этот РД, приходят из тех же секционирования. Наш метод разделителя не работает, поскольку искра принимает разный разделитель для 2 RDD и создает несколько этапов для операций тасования.
Мы завернули RDD с Scala наших пользовательским РДОМ как:
ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
RDD<String> distFile1 = jsc.textFile("SomePath/data.txt",1);
ExtendRDD<String> extendRDD = new ExtendRDD<String>(distFile1, tag);
Мы создаем еще один обычай RDD аналогичным образом и получить PairRDD (pairRDD2) из этого РДА. Затем мы пытаемся применить те же разметки, как в объекте extendRDD к объекту PairRDDFunction с помощью функции partitionBy, а затем применить cogroup к этому:
RDD<Tuple2<String, String>> pairRDD = extendRDD.keyBy(new KeyByImpl());
PairRDDFunctions<String, String> pair = new PairRDDFunctions<String, String>(pairRDD, tag, tag, null);
pair.partitionBy(extendRDD2.getPartitioner());
pair.cogroup(pairRDD2);
Все это не похоже на работу, как искра создает несколько этапов, когда он сталкивается с преобразованием cogroup ,
Любые предложения о том, как мы можем применить один и тот же разделитель к RDD?
Вы используете разделение хешей или разделение по разделам –
HashPartitioning –