2015-06-02 2 views
3

Я знаю, что SortComparator используется для сортировки вывода карты по их ключам. Я написал собственный SortComparator, чтобы лучше понять структуру MapReduce. Это мой класс WordCount с пользовательским классом SortComparator.NullPointerException в программе MapReduce для сортировки

package bananas; 

import java.io.FileWriter; 
import java.io.IOException; 
import java.util.StringTokenizer; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.io.WritableComparator; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Partitioner; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

public class WordCount { 


    public static class TokenizerMapper 
     extends Mapper<Object, Text, Text, IntWritable>{ 

    private final static IntWritable one = new IntWritable(1); 
    private Text word = new Text(); 

    public void map(Object key, Text value, Context context 
        ) throws IOException, InterruptedException { 
     StringTokenizer itr = new StringTokenizer(value.toString()); 

     while (itr.hasMoreTokens()) { 
     word.set(itr.nextToken()); 
     context.write(word, one); 

     } 
    } 
    } 

    public static class IntSumReducer 
     extends Reducer<Text,IntWritable,Text,IntWritable> { 
    private IntWritable result = new IntWritable();  
    public void reduce(Text key, Iterable<IntWritable> values, 
         Context context 
         ) throws IOException, InterruptedException { 


     int sum = 0; 
     for (IntWritable val : values) { 
     sum += val.get(); 
     } 

     result.set(sum); 
     context.write(key, result); 
    } 
    } 

    public static class MyPartitoner extends Partitioner<Text, IntWritable>{ 

    @Override 
    public int getPartition(Text key, IntWritable value, int numPartitions) { 


     return Math.abs(key.hashCode()) % numPartitions; 
    } 
    } 

    public static class MySortComparator2 extends WritableComparator{ 

     protected MySortComparator2() { 
      super(); 
      } 

     @SuppressWarnings({ "rawtypes" }) 
    @Override 
     public int compare(WritableComparable w1,WritableComparable w2){ 

      return 0; 
     } 
    } 

    public static void main(String[] args) throws Exception { 
    Configuration conf = new Configuration(); 
    Job job = Job.getInstance(conf, "word count"); 
    job.setJarByClass(WordCount.class); 
    job.setSortComparatorClass(MySortComparator2.class); 
    job.setMapperClass(TokenizerMapper.class); 
    job.setCombinerClass(IntSumReducer.class); 
    job.setReducerClass(IntSumReducer.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 
    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 
    System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
} 

, но когда я исполню это я получаю эту ошибку

Error: java.lang.NullPointerException 
    at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:157) 
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare(MapTask.java:1265) 
    at org.apache.hadoop.util.QuickSort.fix(QuickSort.java:35) 
    at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:87) 
    at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:63) 
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1593) 
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1482) 
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:720) 
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:790) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) 
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:415) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) 
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) 

Мой пользовательский класс SortComparator выглядит хорошо для меня. После того, как отображение выполнено, метод сравнения MySortComparator2 должен принимать «текстовые» ключи в качестве входных данных, и поскольку я возвращаюсь 0, сортировка не будет выполнена. Это то, что я ожидал увидеть/наблюдать. Я последовал за эти учебники

http://codingjunkie.net/secondary-sort/

http://blog.zaloni.com/secondary-sorting-in-hadoop

http://www.bigdataspeak.com/2013/02/hadoop-how-to-do-secondary-sort-on_25.html

Заранее спасибо, я был бы признателен за помощь.

ответ

3

Вы должны реализовать/переопределить этот метод, тоже:

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
    // per your desired no-sort logic 
    return 0; 
} 

Я думаю, что ваш компаратор строится таким образом, что переменные, указанные в супер реализации равны нулю (и это метод это называется в поддержку такого рода - не метод, который вы написали выше). Вот почему вы получаете исключение нулевого указателя. Переопределяя метод с реализацией, которая не использует переменные, вы можете избежать исключения.

0

Как сказал Крис Геркен, вы должны переопределить этот метод при расширении WritableComparator или реализовать RawComparator вместо WritableComparator.

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
    return 0; 
} 

и, как вы сказали, что вы хотели видеть не сортировки не будет сделано, но если вы возвращаете 0, что означает, что каждый раз, когда MapReduce пытается сортировать/сравнить это видит каждый ключ, как одно и то же так, вы получите только один ключ, пара значений, которая будет первым ключом в задаче карты, которая будет завершена первой, и значением с количеством слов во входном файле. Надеюсь, вы понимаете, что я говорю. Если вход что-то вроде этого

why are rockets cylindrical 

ваш уменьшить выход будет

why 4 

, поскольку он предполагает все, как тот же ключ. Надеюсь, это поможет.

4

На самом деле есть проблема с MySortComparator2 конструктор. Код должен выглядит

protected MySortComparator2() { 
     super(Text.class, true); 
} 

где первый параметр является ключом класса и значение второго параметра обеспечивает WritableComparator конкретизируется таким образом, что WritableComparator.compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) может вызвать MySortComparator2.compare(WritableComparable a, WritableComparable b)

Смежные вопросы