2015-06-24 2 views
0

В приведенном ниже коде метод сокращения класса редуктора не выполняется. помогите мне. В моем методе сокращения я хочу написать вывод в нескольких файлах. поэтому я использовал multipleoutputs.Метод уменьшения в классе Reducer не выполняется

public class DataValidation { 

    public static class Map extends Mapper<LongWritable, Text, Text, Text> { 
     int flag = 1; 
     boolean result; 
     private HashMap<String, FileConfig> fileConfigMaps = new HashMap<String,    FileConfig>(); 
     private HashMap<String, List<LineValidator>> mapOfValidators = new HashMap<String, List<LineValidator>>(); 
     private HashMap<String, List<Processor>> mapOfProcessors = new HashMap<String, List<Processor>>(); 

     protected void setup(Context context) throws IOException { 

      System.out.println("configure inside map class"); 
      ConfigurationParser parser = new ConfigurationParser(); 
      Config config = parser.parse(new Configuration()); 
      List<FileConfig> file = config.getFiles(); 
      for (FileConfig f : file) { 
       try { 
        fileConfigMaps.put(f.getName(), f); 
        System.out.println("quotes in" + f.isQuotes()); 
        System.out.println("file from xml : " + f.getName()); 
        ValidationBuilder builder = new ValidationBuilder(); 
        // ProcessorBuilder constructor = new ProcessorBuilder(); 
        List<LineValidator> validators; 
        validators = builder.build(f); 
        // List<Processor> processors = constructor.build(f); 
        mapOfValidators.put(f.getName(), validators); 
        // mapOfProcessors.put(f.getName(),processors); 
       } catch (Exception e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace(); 
       } 

      } 

     } 

     protected void map(LongWritable key, Text value, Context context) 
       throws IOException, InterruptedException { 
      // String filename = ((FileSplit) context.getInputSplit()).getPath() 
      // .getName(); 
      FileSplit fs = (FileSplit) context.getInputSplit(); 
      String fileName = fs.getPath().getName(); 
      System.out.println("filename : " + fileName); 
      String line = value.toString(); 
      String[] csvDataArray = null; 
      List<LineValidator> lvs = mapOfValidators.get(fileName); 
      flag = 1; 
      csvDataArray = line.split(",", -1); 
      FileConfig fc = fileConfigMaps.get(fileName); 
      System.out.println("filename inside fileconfig " + fc.getName()); 
      System.out.println("quote values" + fc.isQuotes()); 
      if (fc.isQuotes()) { 
       for (int i = 0; i < csvDataArray.length; i++) { 
        csvDataArray[i] = csvDataArray[i].replaceAll("\"", ""); 
       } 
      } 
      for (LineValidator lv : lvs) { 
       if (flag == 1) { 
        result = lv.validate(csvDataArray, fileName); 
        if (result == false) { 
         String write = line + "," + lv.getFailureDesc(); 
         System.out.println("write" + write); 
         System.out.println("key" + new Text(fileName)); 
         // output.collect(new Text(filename), new Text(write)); 

         context.write(new Text(fileName), new Text(write)); 

         flag = 0; 
         if (lv.stopValidation(csvDataArray) == true) { 
          break; 
         } 
        } 
       } 
      } 

     } 

     protected void cleanup(Context context) { 
      System.out.println("clean up in mapper"); 
     } 
    } 

    public static class Reduce extends Reducer<Text, Text, NullWritable, Text> { 

     protected void reduce(Text key, Iterator<Text> values, Context context) 
       throws IOException, InterruptedException { 
      System.out.println("inside reduce method"); 
      while (values.hasNext()) { 
       System.out.println(" Nullwritable value" + NullWritable.get()); 
       System.out.println("key inside reduce method" + key.toString()); 
       context.write(NullWritable.get(), values.next()); 
       // out.write(NullWritable.get(), values.next(), "/user/hadoop/" 
       // + context.getJobID() + "/" + key.toString() + "/part-"); 
      } 
     } 

    } 

    public static void main(String[] args) throws Exception { 
     System.out.println("hello"); 

     Configuration configuration = getConf(); 

     Job job = Job.getInstance(configuration); 
     job.setJarByClass(DataValidation.class); 
     job.setMapperClass(Map.class); 
     job.setReducerClass(Reduce.class); 
     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(Text.class); 
     job.setOutputKeyClass(NullWritable.class); 
     job.setOutputValueClass(Text.class); 
     FileInputFormat.addInputPath(job, new Path(args[0])); 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 

     job.waitForCompletion(true); 

    } 

    private static Configuration getConf() { 
     return new Configuration(); 
    } 

} 

ответ

0

У вас нет должным образом чрезмерного метода уменьшения. Используйте это:

общественного недействительными уменьшить (Key ключ, Iterable значения, контекст Context) бросает IOException, InterruptedException

+0

Охладить свою работу ... –

+0

Спасибо большое ... –

+0

Хорошо знать, что он работает. Пожалуйста, примите ответ, если он решит вашу проблему. Благодарю. –

Смежные вопросы