Среднее арифметическое - это агрегатная функция, которая не является дистрибутивной, но алгебраической. Согласно Han et al. агрегатная функция является распределительной, если:
[...] его можно вычислить [...] следующим образом. Предположим, что данные [..] разделены на n комплектов. Мы применяем эту функцию к каждому разделу, что приводит к совокупным значениям n. Если результат, полученный путем применения функции к совокупным значениям n, совпадает с результатом, полученным путем применения функции ко всему набору данных (без разбиения на разделы), функция может быть вычислена распределенным образом.
Или, другими словами, он должен быть ассоциативным и коммутативным. Агрегатная функция, однако, в соответствии с алгебраической Han et al. если:
[...] она может быть вычислена с помощью алгебраической функции с аргументами (м, где М представляет собой ограниченное положительное целое число), каждый из которых получают путь применения распределяющая агрегатная функция.
Для среднего арифметического это просто ср = сумма/кол. Очевидно, что вам нужно нести счет дополнительно. Но использование глобального счетчика для этого кажется неправильным. API описывает org.apache.hadoop.mapreduce.Counter
следующим образом:
Именованная счетчик, который отслеживает ход карты/уменьшить работу.
Счетчики обычно должны использоваться для статистики о работе в любом случае, но не как часть вычислений во время самой обработки данных.
Итак, все, что вам нужно сделать в разделе, - это добавить свои номера вверх и отслеживать их количество вместе с суммой (сумма, счет); простым подходом может быть строка типа <sum><separator><count>
.
В картографе счет будет всегда 1, а сумма - это само исходное значение. Чтобы уменьшить файлы карт, вы можете использовать комбайнер и обрабатывать агрегаты, такие как (sum_1 + ... + sum_n, count_1 + ... + count_n). Это необходимо повторить в редукторе и завершить окончательным расчетом сумма/кол-во. Имейте в виду, что этот подход не зависит от используемого ключа!
Наконец вот простой пример использования сырьевой crime statistics of the LAPD, который должен высчитывает «среднее время преступления» в Лос-Анджелесе:
public class Driver extends Configured implements Tool {
enum Counters {
DISCARDED_ENTRY
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new Driver(), args);
}
public int run(String[] args) throws Exception {
Configuration configuration = getConf();
Job job = Job.getInstance(configuration);
job.setJarByClass(Driver.class);
job.setMapperClass(Mapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setCombinerClass(Combiner.class);
job.setReducerClass(Reducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : -1;
}
}
public class Mapper extends org.apache.hadoop.mapreduce.Mapper<
LongWritable,
Text,
LongWritable,
Text
> {
@Override
protected void map(
LongWritable key,
Text value,
org.apache.hadoop.mapreduce.Mapper<
LongWritable,
Text,
LongWritable,
Text
>.Context context
) throws IOException, InterruptedException {
// parse the CSV line
ArrayList<String> values = this.parse(value.toString());
// validate the parsed values
if (this.isValid(values)) {
// fetch the third and the fourth column
String time = values.get(3);
String year = values.get(2)
.substring(values.get(2).length() - 4);
// convert time to minutes (e.g. 1542 -> 942)
int minutes = Integer.parseInt(time.substring(0, 2))
* 60 + Integer.parseInt(time.substring(2,4));
// create the aggregate atom (a/n)
// with a = time in minutes and n = 1
context.write(
new LongWritable(Integer.parseInt(year)),
new Text(Integer.toString(minutes) + ":1")
);
} else {
// invalid line format, so we increment a counter
context.getCounter(Driver.Counters.DISCARDED_ENTRY)
.increment(1);
}
}
protected boolean isValid(ArrayList<String> values) {
return values.size() > 3
&& values.get(2).length() == 10
&& values.get(3).length() == 4;
}
protected ArrayList<String> parse(String line) {
ArrayList<String> values = new ArrayList<>();
String current = "";
boolean escaping = false;
for (int i = 0; i < line.length(); i++){
char c = line.charAt(i);
if (c == '"') {
escaping = !escaping;
} else if (c == ',' && !escaping) {
values.add(current);
current = "";
} else {
current += c;
}
}
values.add(current);
return values;
}
}
public class Combiner extends org.apache.hadoop.mapreduce.Reducer<
LongWritable,
Text,
LongWritable,
Text
> {
@Override
protected void reduce(
LongWritable key,
Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
Long n = 0l;
Long a = 0l;
Iterator<Text> iterator = values.iterator();
// calculate intermediate aggregates
while (iterator.hasNext()) {
String[] atom = iterator.next().toString().split(":");
a += Long.parseLong(atom[0]);
n += Long.parseLong(atom[1]);
}
context.write(key, new Text(Long.toString(a) + ":" + Long.toString(n)));
}
}
public class Reducer extends org.apache.hadoop.mapreduce.Reducer<
LongWritable,
Text,
LongWritable,
Text
> {
@Override
protected void reduce(
LongWritable key,
Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
Long n = 0l;
Long a = 0l;
Iterator<Text> iterator = values.iterator();
// calculate the finale aggregate
while (iterator.hasNext()) {
String[] atom = iterator.next().toString().split(":");
a += Long.parseLong(atom[0]);
n += Long.parseLong(atom[1]);
}
// cut of seconds
int average = Math.round(a/n);
// convert the average minutes back to time
context.write(
key,
new Text(
Integer.toString(average/60)
+ ":" + Integer.toString(average % 60)
)
);
}
}
Пожалуйста, пост код у вас есть до сих пор. –
Нет. Это не домашнее задание ... это проект, над которым я работаю .. – Amnesiac
Эй, у меня нет кода ryt сейчас .. я могу опубликовать его в понедельник .., но если вы работали над ним, можете u PLZ помочь ?? – Amnesiac