2016-12-14 3 views
3

Я довольно новичок в Spark, и я использую кластер в основном для целей паралеллизации. У меня есть файл размером 100 Мбайт, каждая строка которого обрабатывается некоторым алгоритмом, который является довольно тяжелой и длительной обработкой.Исходные данные перекоса для небольшого файла

Я хочу использовать кластер из 10 узлов и распараллелить обработку. Я знаю, что размер блока больше 100MB, и я попытался переделать textFile. Если я хорошо понимаю, это repartition метод увеличивает количество разделов:

JavaRDD<String> input = sc.textFile(args[0]); 
input.repartition(10); 

Вопрос заключается в том, что при развертывании в кластере, только один узел эффективно обрабатывать. Как я могу управлять процессом параллельно?

Update 1: вот моя spark-submit команда:

/usr/bin/spark-submit --master yarn --class mypackage.myclass --jars 
myjar.jar 
gs://mybucket/input.txt outfile 

Update 2: После раздела, есть в основном 2 операции:

JavaPairRDD<String, String> int_input = mappingToPair(input); 
JavaPairRDD<String, String> output = mappingValues(int_input, option); 
output.saveAsTextFile("hdfs://..."); 

где mappingToPair(...) является

public JavaPairRDD<String, String> mappingToPair(JavaRDD<String> input){ 
     return input.mapToPair(new PairFunction<String, String, String>() { 
      public Tuple2<String, String> call(String line) { 
       String[] arrayList = line.split("\t", 2); 
       return new Tuple2(arrayList[0], arrayList[1]); 
      } 
     }); 
    } 

и mappingValues(...) является метод следующего типа:

public JavaPairRDD<String,String> mappingValues(JavaPairRDD<String,String> rdd, final String option){ 
     return rdd.mapValues(
       new Function<String, String>() { 
        // here the algo processing takes place... 
       } 
     ) 
} 
+0

Возможно, добавьте еще код, описывающий, что происходит после передела, иначе вопрос не ясен. – Chobeat

+0

также покажут нам, что вы запускали искру submit cmd. –

+0

@UmbertoGriffo вот команда – Newben

ответ

2

Там может быть несколько вопросов здесь:

  1. Файл только один блок большой. Чтение этого с помощью нескольких исполнителей вообще нецелесообразно, поскольку узел HDFS может обслуживать один узел с полной скоростью или два узла с половиной скорости (плюс накладные расходы) и т. Д. Счет исполнителей становится полезным (для шага чтения), когда вы имеют несколько блоков, разбросанных по различным узлам HDFS.
  2. Возможно также, что вы сохраняете файл в нераспадаемом сжатом формате, поэтому шаг ввода может читать его только с одним исполнителем, даже если он будет в 100 раз больше размера блока.
  3. Вы не связываете вызов repartition(10) в свой поток, поэтому он не эффективен. Если вы замените эту строку: input.repartition(10); с этим: input = input.repartition(10); он будет использоваться, и он должен разделить RDD на несколько, прежде чем переходить к следующему шагу.

Обратите внимание, что перераспределение может сделать ваш процесс еще более продолжительным, поскольку данные должны быть разделены и переданы другим машинам, что может быть легко устранено медленной сетью.

Это особенно актуально, если вы используете режим развертывания клиента. Это означает, что первым исполнителем (драйвером) является ваш локальный экземпляр Spark, который вы отправляете. Поэтому он сначала загрузит все данные в драйвер из кластера, а затем загрузит их обратно в другие узлы YARN после разделения.

Я мог бы продолжить об этом, но главное, что я пытаюсь сказать: процесс может даже работать быстрее на одном исполнителе, если ваш алгоритм очень прост, вместо того, чтобы разбивать на разделы, передавать и затем запускать алгоритм на всех исполнителей параллельно.

+1

спасибо, что это сработало;) – Newben

Смежные вопросы