Я хочу измерить время выполнения функции combByKey. Я всегда получаю результат 20-22 мс (HashPartitioner) и ~ 350 мс (без учета) с кодом ниже, независимо от используемого размера файла (файл0: ~ 300 кБ, файл1: ~ 3 ГБ, файл2: ~ 8 ГБ)! Может ли это быть правдой? Или я делаю что-то неправильно ???Измерение продолжительности выполнения функции combByKey в Spark
JavaPairRDD<Integer, String> pairRDD = null;
JavaPairRDD<Integer, String> partitionedRDD = null;
JavaPairRDD<Integer, Float> consumptionRDD = null;
boolean partitioning = true; //or false
int partitionCount = 100; // between 1 and 200 I cant see any difference in the duration!
SparkConf conf = new SparkConf();
JavaSparkContext sc = new JavaSparkContext(conf);
input = sc.textFile(path);
pairRDD = mapToPair(input);
partitionedRDD = partition(pairRDD, partitioning, partitionsCount);
long duration = System.currentTimeMillis();
consumptionRDD = partitionedRDD.combineByKey(createCombiner, mergeValue, mergeCombiners);
duration = System.currentTimeMillis() - duration; // Measured time always the same, independent of file size (~20ms with/~350ms without partitioning)
// Do an action
Tuple2<Integer, Float> test = consumptionRDD.takeSample(true, 1).get(0);
sc.stop();
Некоторые вспомогательные методы (не имеет значения):
// merging function for a new dataset
private static Function2<Float, String, Float> mergeValue = new Function2<Float, String, Float>() {
public Float call(Float sumYet, String dataSet) throws Exception {
String[] data = dataSet.split(",");
float value = Float.valueOf(data[2]);
sumYet += value;
return sumYet;
}
};
// function to sum the consumption
private static Function2<Float, Float, Float> mergeCombiners = new Function2<Float, Float, Float>() {
public Float call(Float a, Float b) throws Exception {
a += b;
return a;
}
};
private static JavaPairRDD<Integer, String> partition(JavaPairRDD<Integer, String> pairRDD, boolean partitioning, int partitionsCount) {
if (partitioning) {
return pairRDD.partitionBy(new HashPartitioner(partitionsCount));
} else {
return pairRDD;
}
}
private static JavaPairRDD<Integer, String> mapToPair(JavaRDD<String> input) {
return input.mapToPair(new PairFunction<String, Integer, String>() {
public Tuple2<Integer, String> call(String debsDataSet) throws Exception {
String[] data = debsDataSet.split(",");
int houseId = Integer.valueOf(data[6]);
return new Tuple2<Integer, String>(houseId, debsDataSet);
}
});
}
Спасибо за подсказку. Сегодня у меня был более глубокий взгляд на веб-интерфейс, но я не мог найти никакой информации о транзакции combinByKey. Пользовательский интерфейс отображает только информацию о преобразованиях mapToPair и takeSample! –
Какую версию искры вы используете? – Bacon
Я использую Spark 1.2.2 –