2015-06-12 1 views
0

Я хочу измерить время выполнения функции combByKey. Я всегда получаю результат 20-22 мс (HashPartitioner) и ~ 350 мс (без учета) с кодом ниже, независимо от используемого размера файла (файл0: ~ 300 кБ, файл1: ~ 3 ГБ, файл2: ~ 8 ГБ)! Может ли это быть правдой? Или я делаю что-то неправильно ???Измерение продолжительности выполнения функции combByKey в Spark

JavaPairRDD<Integer, String> pairRDD = null; 
JavaPairRDD<Integer, String> partitionedRDD = null; 
JavaPairRDD<Integer, Float> consumptionRDD = null; 

boolean partitioning = true; //or false 
int partitionCount = 100;  // between 1 and 200 I cant see any difference in the duration! 

SparkConf conf = new SparkConf(); 
JavaSparkContext sc = new JavaSparkContext(conf); 

input = sc.textFile(path); 
pairRDD = mapToPair(input); 
partitionedRDD = partition(pairRDD, partitioning, partitionsCount); 

long duration = System.currentTimeMillis(); 
consumptionRDD = partitionedRDD.combineByKey(createCombiner, mergeValue, mergeCombiners); 
duration = System.currentTimeMillis() - duration;  // Measured time always the same, independent of file size (~20ms with/~350ms without partitioning) 

// Do an action 
Tuple2<Integer, Float> test = consumptionRDD.takeSample(true, 1).get(0); 

sc.stop(); 

Некоторые вспомогательные методы (не имеет значения):

// merging function for a new dataset 
private static Function2<Float, String, Float> mergeValue = new Function2<Float, String, Float>() { 
    public Float call(Float sumYet, String dataSet) throws Exception { 
     String[] data = dataSet.split(","); 
     float value = Float.valueOf(data[2]); 
     sumYet += value; 
     return sumYet; 
    } 
}; 

// function to sum the consumption 
private static Function2<Float, Float, Float> mergeCombiners = new Function2<Float, Float, Float>() { 
    public Float call(Float a, Float b) throws Exception { 
     a += b; 
     return a; 
    } 
}; 

private static JavaPairRDD<Integer, String> partition(JavaPairRDD<Integer, String> pairRDD, boolean partitioning, int partitionsCount) { 
    if (partitioning) { 
     return pairRDD.partitionBy(new HashPartitioner(partitionsCount)); 
    } else { 
     return pairRDD; 
    } 
} 

private static JavaPairRDD<Integer, String> mapToPair(JavaRDD<String> input) { 
    return input.mapToPair(new PairFunction<String, Integer, String>() { 
     public Tuple2<Integer, String> call(String debsDataSet) throws Exception { 
      String[] data = debsDataSet.split(","); 
      int houseId = Integer.valueOf(data[6]); 
      return new Tuple2<Integer, String>(houseId, debsDataSet); 
     } 
    }); 
} 

ответ

1

веб-интерфейс предоставляет вам подробную информацию о работе/стадии, что ваше приложение запуска. Он детализирует время для каждого из них, и теперь вы можете фильтровать различные детали, такие как Задержка планировщика, Время десериализации задачи и Время выполнения Serialization.

Порт по умолчанию для webui - 8080. Заполненное приложение перечислено там, и вы можете щелкнуть по названию или создать такой URL: xxxx: 8080/history/app- [APPID] для доступа к этим деталям ,

Я не считаю, что существуют какие-либо другие «встроенные» методы для контроля времени выполнения задачи/стадии. В противном случае вы можете пойти глубже и использовать инфраструктуру отладки JVM.

EDIT: combineByKey этого преобразование, что означает, что он не применяется на ваш РДЕ, в отличие от действий (читать далее поведение РДА here, глава 3.1 ленивого). Я полагаю, что разница во времени, которую вы наблюдаете, исходит от времени, которое занимает SPARK для создания фактической структуры данных при разбиении на разделы или нет.

Если разница есть, вы будете видеть его во время действия пользователя (takeSample здесь)

+0

Спасибо за подсказку. Сегодня у меня был более глубокий взгляд на веб-интерфейс, но я не мог найти никакой информации о транзакции combinByKey. Пользовательский интерфейс отображает только информацию о преобразованиях mapToPair и takeSample! –

+0

Какую версию искры вы используете? – Bacon

+0

Я использую Spark 1.2.2 –

Смежные вопросы