2015-07-29 3 views
1

У меня есть Mapper<AvroKey<Email>, NullWritable, Text, Text>, который эффективно берет электронную почту и несколько раз выплевывает ключ адреса электронной почты и значение поля, в котором оно было найдено (от, до, cc и т. Д.). ,Объединение результатов от hadoop map-reduce

Тогда у меня есть Reducer<Text, Text, NullWritable, Text>, который принимает адрес электронной почты и имя поля. Он выплескивает ключ NullWritable и подсчитывает, сколько раз адрес присутствует в заданном поле. например ..

{ 
    "address": "[email protected]", 
    "toCount": 12, 
    "fromCount": 4 
} 

Я использую FileUtil.copyMerge приравнивать выход из рабочих мест, но (очевидно) результаты различных восстановителей не объединены, так что на практике я вижу:

{ 
    "address": "[email protected]", 
    "toCount": 12, 
    "fromCount": 0 
}, { 
    "address": "[email protected]", 
    "toCount": 0, 
    "fromCount": 4 
} 

Есть ли более разумный способ приблизиться к этой проблеме, чтобы я мог получить единственный результат на адрес электронной почты? (Я собираю, что сборщик, выполняющий фазу предварительного сокращения, работает только на подмножестве данных и не гарантированно дает результаты, которые я хочу)?

Edit:

код Reducer будет что-то вроде:

public class EmailReducer extends Reducer<Text, Text, NullWritable, Text> { 

    private static final ObjectMapper mapper = new ObjectMapper(); 

    public void reduce(Text key, Iterable<Text> values, Context context) 
      throws IOException, InterruptedException { 
     Map<String, Map<String, Object>> results = new HashMap<>(); 

     for (Text value : values) { 
      if (!results.containsKey(value.toString())) { 
       Map<String, Object> result = new HashMap<>(); 
       result.put("address", key.toString()); 
       result.put("to", 0); 
       result.put("from", 0); 

       results.put(value.toString(), result); 
      } 

      Map<String, Object> result = results.get(value.toString()); 

      switch (value.toString()) { 
      case "TO": 
       result.put("to", ((int) result.get("to")) + 1); 
       break; 
      case "FROM": 
       result.put("from", ((int) result.get("from")) + 1); 
       break; 
     } 

     results.values().forEach(result -> { 
      context.write(NullWritable.get(), new Text(mapper.writeValueAsString(result))); 
     }); 
    } 
} 
+0

Почему вам нужно много рабочих мест? Вы запускаете код на разных типах ввода (т. Е. В разных форматах)? (Я не знаком с Avro, так что простите, если этот комментарий звучит глупо). Можете ли вы поделиться кодом редуктора (псевдо)? – vefthym

+0

Да, на самом деле это не будет работать против писем, это просто абстракция. – rich

ответ

1

Каждый входной ключ редуктора соответствует уникальный адрес электронной почты, так что вам не нужно results коллекции. Каждый раз, когда метод reduce называется, это для отдельного адреса электронной почты, так что мое предложение:

public class EmailReducer extends Reducer<Text, Text, NullWritable, Text> { 

    private static final ObjectMapper mapper = new ObjectMapper(); 

    public void reduce(Text key, Iterable<Text> values, Context context) 
     throws IOException, InterruptedException { 

    Map<String, Object> result = new HashMap<>(); 
    result.put("address", key.toString()); 
    result.put("to", 0); 
    result.put("from", 0); 

    for (Text value : values) { 
     switch (value.toString()) { 
     case "TO": 
      result.put("to", ((int) result.get("to")) + 1); 
      break; 
     case "FROM": 
      result.put("from", ((int) result.get("from")) + 1); 
      break; 
    } 

    context.write(NullWritable.get(), new Text(mapper.writeValueAsString(result))); 

    } 
} 

Я не уверен, что делает класс ObjectMapper, но я полагаю, что вам это нужно для форматирования вывода. В противном случае я бы напечатал ключ ввода в качестве выходного ключа (т. Е. Адрес электронной почты) и два конкатенированных подсчета для полей «от» и «до» каждого адреса электронной почты.

Если ваш ввод представляет собой сбор данных (т. Е. Не потоки или что-то подобное), вы должны получить каждый адрес электронной почты только один раз. Если ваш ввод задан в потоках, и вам нужно постепенно наращивать свой окончательный вывод, то вывод одного задания может быть входом другого. Если это так, я предлагаю использовать MultipleInputs, в котором один Mapper является тем, который вы описали ранее, и другим IdentityMapper, пересылает вывод предыдущего задания в Reducer. Таким образом, снова тот же адрес электронной почты обрабатывается одной и той же задачей сокращения.

+0

Конечно. Гораздо лучше, спасибо! – rich

+1

(FWIW ObjectMapper - класс Jackson для производства JSON). – rich

+0

@rich спасибо, я посмотрю! – vefthym