2012-05-19 4 views
5

Я пытался написать код, чтобы найти среднее число чисел, используя MapReduce.Найти среднее число, используя MapReduce

Я пытаюсь использовать глобальные счетчики, чтобы достичь своей цели, но я не в состоянии установить значение счетчика в map методе моего Mapper, и я также не смогло retrive значения счетчика в reduce методе моего Reducer ,

Должен ли я использовать глобальный счетчик в map в любом случае (например, используя incrCounter(key, amount) предоставленного Reporter)? Или вы предложите любую другую логику, чтобы получить среднее значение некоторых чисел?

+7

Пожалуйста, пост код у вас есть до сих пор. –

+1

Нет. Это не домашнее задание ... это проект, над которым я работаю .. – Amnesiac

+0

Эй, у меня нет кода ryt сейчас .. я могу опубликовать его в понедельник .., но если вы работали над ним, можете u PLZ помочь ?? – Amnesiac

ответ

5

Логика довольно проста: Если все числа имеют же ключ, то картографа послал все значения, которые вы хотите найти среднее значение с тем же ключом. Из-за этого в редукторе вы можете суммировать значения в итераторе. Затем вы можете сохранить счетчик в течение времени, в течение которого работает итератор, который решает вопрос о том, сколько предметов нужно усреднить. Наконец, после итератора вы можете найти среднее значение, разделив сумму на количество элементов.

Будьте осторожны, эта логика не будет работать, если класс объединитель устанавливается в том же классе, редуктор ...

+0

Я использую класс комбайнера, такой же, как класс редуктора .... – Amnesiac

1

средней суммы/размер. Если сумма представляет собой что-то вроде sum = k1 + k2 + k3 + ..., вы можете разделить по размеру после или во время подведения итогов. Таким образом, в среднем также k1/размер + k2/размер + k3/размер + ...

Java-8 код прост:

public double average(List<Valuable> list) { 
     final int size = list.size(); 
     return list 
      .stream() 
      .mapToDouble(element->element.someValue()) 
      .reduce(0,(sum,x)->sum+x/size); 
    } 

Таким образом, вы сначала отобразить значение каждого из ваших элементов в списке для двойного и последующего суммирования с помощью функции уменьшения.

+0

Использование дополнительный шаг для размера, вы на самом деле не вычисляете среднее значение с уменьшением карты здесь, а только сумму. Сложная часть делает все сразу ... – ftl

0

Среднее арифметическое - это агрегатная функция, которая не является дистрибутивной, но алгебраической. Согласно Han et al. агрегатная функция является распределительной, если:

[...] его можно вычислить [...] следующим образом. Предположим, что данные [..] разделены на n комплектов. Мы применяем эту функцию к каждому разделу, что приводит к совокупным значениям n. Если результат, полученный путем применения функции к совокупным значениям n, совпадает с результатом, полученным путем применения функции ко всему набору данных (без разбиения на разделы), функция может быть вычислена распределенным образом.

Или, другими словами, он должен быть ассоциативным и коммутативным. Агрегатная функция, однако, в соответствии с алгебраической Han et al. если:

[...] она может быть вычислена с помощью алгебраической функции с аргументами (м, где М представляет собой ограниченное положительное целое число), каждый из которых получают путь применения распределяющая агрегатная функция.

Для среднего арифметического это просто ср = сумма/кол. Очевидно, что вам нужно нести счет дополнительно. Но использование глобального счетчика для этого кажется неправильным. API описывает org.apache.hadoop.mapreduce.Counter следующим образом:

Именованная счетчик, который отслеживает ход карты/уменьшить работу.

Счетчики обычно должны использоваться для статистики о работе в любом случае, но не как часть вычислений во время самой обработки данных.

Итак, все, что вам нужно сделать в разделе, - это добавить свои номера вверх и отслеживать их количество вместе с суммой (сумма, счет); простым подходом может быть строка типа <sum><separator><count>.

В картографе счет будет всегда 1, а сумма - это само исходное значение. Чтобы уменьшить файлы карт, вы можете использовать комбайнер и обрабатывать агрегаты, такие как (sum_1 + ... + sum_n, count_1 + ... + count_n). Это необходимо повторить в редукторе и завершить окончательным расчетом сумма/кол-во. Имейте в виду, что этот подход не зависит от используемого ключа!

Наконец вот простой пример использования сырьевой crime statistics of the LAPD, который должен высчитывает «среднее время преступления» в Лос-Анджелесе:

public class Driver extends Configured implements Tool { 
    enum Counters { 
     DISCARDED_ENTRY 
    } 

    public static void main(String[] args) throws Exception { 
     ToolRunner.run(new Driver(), args); 
    } 

    public int run(String[] args) throws Exception { 
     Configuration configuration = getConf(); 

     Job job = Job.getInstance(configuration); 
     job.setJarByClass(Driver.class); 

     job.setMapperClass(Mapper.class); 
     job.setMapOutputKeyClass(LongWritable.class); 
     job.setMapOutputValueClass(Text.class); 

     job.setCombinerClass(Combiner.class); 
     job.setReducerClass(Reducer.class); 
     job.setOutputKeyClass(LongWritable.class); 
     job.setOutputValueClass(Text.class); 

     FileInputFormat.addInputPath(job, new Path(args[0])); 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 

     return job.waitForCompletion(true) ? 0 : -1; 
    } 
} 

public class Mapper extends org.apache.hadoop.mapreduce.Mapper< 
    LongWritable, 
    Text, 
    LongWritable, 
    Text 
> { 

    @Override 
    protected void map(
     LongWritable key, 
     Text value, 
     org.apache.hadoop.mapreduce.Mapper< 
      LongWritable, 
      Text, 
      LongWritable, 
      Text 
     >.Context context 
    ) throws IOException, InterruptedException { 
      // parse the CSV line 
      ArrayList<String> values = this.parse(value.toString()); 

      // validate the parsed values 
      if (this.isValid(values)) { 

       // fetch the third and the fourth column 
       String time = values.get(3); 
       String year = values.get(2) 
        .substring(values.get(2).length() - 4); 

       // convert time to minutes (e.g. 1542 -> 942) 
       int minutes = Integer.parseInt(time.substring(0, 2)) 
        * 60 + Integer.parseInt(time.substring(2,4)); 

       // create the aggregate atom (a/n) 
       // with a = time in minutes and n = 1 
       context.write(
        new LongWritable(Integer.parseInt(year)), 
        new Text(Integer.toString(minutes) + ":1") 
       ); 
      } else { 
       // invalid line format, so we increment a counter 
       context.getCounter(Driver.Counters.DISCARDED_ENTRY) 
        .increment(1); 
      } 
    } 

    protected boolean isValid(ArrayList<String> values) { 
     return values.size() > 3 
      && values.get(2).length() == 10 
      && values.get(3).length() == 4; 
    } 

    protected ArrayList<String> parse(String line) { 
     ArrayList<String> values = new ArrayList<>(); 
     String current = ""; 
     boolean escaping = false; 

     for (int i = 0; i < line.length(); i++){ 
      char c = line.charAt(i); 

      if (c == '"') { 
       escaping = !escaping; 
      } else if (c == ',' && !escaping) { 
       values.add(current); 
       current = ""; 
      } else { 
       current += c; 
      } 
     } 

     values.add(current); 

     return values; 
    } 
} 

public class Combiner extends org.apache.hadoop.mapreduce.Reducer< 
    LongWritable, 
    Text, 
    LongWritable, 
    Text 
> { 

    @Override 
    protected void reduce(
     LongWritable key, 
     Iterable<Text> values, 
     Context context 
    ) throws IOException, InterruptedException { 
     Long n = 0l; 
     Long a = 0l; 
     Iterator<Text> iterator = values.iterator(); 

     // calculate intermediate aggregates 
     while (iterator.hasNext()) { 
      String[] atom = iterator.next().toString().split(":"); 
      a += Long.parseLong(atom[0]); 
      n += Long.parseLong(atom[1]); 
     } 

     context.write(key, new Text(Long.toString(a) + ":" + Long.toString(n))); 
    } 
} 

public class Reducer extends org.apache.hadoop.mapreduce.Reducer< 
    LongWritable, 
    Text, 
    LongWritable, 
    Text 
> { 

    @Override 
    protected void reduce(
     LongWritable key, 
     Iterable<Text> values, 
     Context context 
    ) throws IOException, InterruptedException { 
     Long n = 0l; 
     Long a = 0l; 
     Iterator<Text> iterator = values.iterator(); 

     // calculate the finale aggregate 
     while (iterator.hasNext()) { 
      String[] atom = iterator.next().toString().split(":"); 
      a += Long.parseLong(atom[0]); 
      n += Long.parseLong(atom[1]); 
     } 

     // cut of seconds 
     int average = Math.round(a/n); 

     // convert the average minutes back to time 
     context.write(
      key, 
      new Text(
       Integer.toString(average/60) 
        + ":" + Integer.toString(average % 60) 
      ) 
     ); 
    } 
} 
Смежные вопросы