2014-12-11 5 views
0

Во-первых, я новичок в Hadoop MapReduce. Мой редуктор не работает, но показывает, что работа успешно завершена. Ниже моя консоль выход:Сокращение не запускается, но работа успешно завершена.

    • INFO mapreduce.Job: Выполнение работы: job_1418240815217_0015
    • INFO mapreduce.Job: Работа job_1418240815217_0015 работает в режиме убер: ложные
    • INFO mapreduce.Job: карта 0% снижение 0%
    • INFO mapreduce.Job: карта 100% 0% уменьшить
    • INFO mapreduce.Job: Работа job_1418240815217_0015 успешно завершена
    • INFO MapReduce. Работа: Счётчики: 30

Основной класс:

public static void main(String[] args) throws Exception { 
    Configuration conf = new Configuration(); 
    @SuppressWarnings("deprecation") 
    Job job = new Job(conf,"NPhase2"); 

    job.setJarByClass(NPhase2.class); 

    job.setMapOutputKeyClass(IntWritable.class); 
    job.setMapOutputValueClass(NPhase2Value.class); 
    job.setOutputKeyClass(NullWritable.class); 
    job.setOutputValueClass(Text.class);   

    job.setMapperClass(MapClass.class);   
    job.setReducerClass(Reduce.class); 

    int numberOfPartition = 0; 
    List<String> other_args = new ArrayList<String>(); 

    for(int i = 0; i < args.length; ++i) 
    { 
     try { 
      if ("-m".equals(args[i])) { 
       //conf.setNumMapTasks(Integer.parseInt(args[++i])); 
       ++i; 
      } else if ("-r".equals(args[i])) { 
       job.setNumReduceTasks(Integer.parseInt(args[++i])); 
      } else if ("-k".equals(args[i])) { 
       int knn = Integer.parseInt(args[++i]); 
       conf.setInt("knn", knn); 
       System.out.println(knn); 
      } else { 
       other_args.add(args[i]); 
      } 
      job.setNumReduceTasks(numberOfPartition * numberOfPartition); 
      //conf.setNumReduceTasks(1); 
     } catch (NumberFormatException except) { 
      System.out.println("ERROR: Integer expected instead of " + args[i]); 
     } catch (ArrayIndexOutOfBoundsException except) { 
      System.out.println("ERROR: Required parameter missing from " + args[i-1]); 
     } 
    } 

    // Make sure there are exactly 2 parameters left. 
    if (other_args.size() != 2) { 
     System.out.println("ERROR: Wrong number of parameters: " + 
      other_args.size() + " instead of 2."); 
    } 

    FileInputFormat.setInputPaths(job, other_args.get(0)); 
    FileOutputFormat.setOutputPath(job, new Path(other_args.get(1))); 

    System.exit(job.waitForCompletion(true) ? 0 : 1); 
} 

Мой картографа является:

общественный статический класс MapClass расширяет Mapper {

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
    { 
     String line = value.toString(); 
     String[] parts = line.split("\\s+"); 
     // key format <rid1> 
     IntWritable mapKey = new IntWritable(Integer.valueOf(parts[0])); 
     // value format <rid2, dist> 
     NPhase2Value np2v = new NPhase2Value(Integer.valueOf(parts[1]), Float.valueOf(parts[2])); 
     context.write(mapKey, np2v); 
    } 
} 

Мой класс редуктор:

public static class Reduce extends Reducer<IntWritable, NPhase2Value, NullWritable, Text> 
    { 
     int numberOfPartition; 
     int knn; 

     class Record 
    { 
     public int id2; 
     public float dist; 

     Record(int id2, float dist) 
     { 
      this.id2 = id2; 
      this.dist = dist; 
     } 

     public String toString() 
     { 
      return Integer.toString(id2) + " " + Float.toString(dist); 
     } 
    } 

    class RecordComparator implements Comparator<Record> 
    { 
     public int compare(Record o1, Record o2) 
     { 
      int ret = 0; 
      float dist = o1.dist - o2.dist; 

      if (Math.abs(dist) < 1E-6) 
       ret = o1.id2 - o2.id2; 
      else if (dist > 0) 
       ret = 1; 
      else 
       ret = -1; 

      return -ret; 
     } 
    } 

    public void setup(Context context) 
    { 
     Configuration conf = new Configuration(); 
     conf = context.getConfiguration(); 
     numberOfPartition = conf.getInt("numberOfPartition", 2);  
     knn = conf.getInt("knn", 3); 
    } 

    public void reduce(IntWritable key, Iterator<NPhase2Value> values, Context context) throws IOException, InterruptedException 
    { 
     //initialize the pq 
     RecordComparator rc = new RecordComparator(); 
     PriorityQueue<Record> pq = new PriorityQueue<Record>(knn + 1, rc); 

     // For each record we have a reduce task 
     // value format <rid1, rid2, dist> 
     while (values.hasNext()) 
     { 
      NPhase2Value np2v = values.next(); 

      int id2 = np2v.getFirst().get(); 
      float dist = np2v.getSecond().get(); 
      Record record = new Record(id2, dist); 
      pq.add(record); 
      if (pq.size() > knn) 
       pq.poll(); 
     } 

     while(pq.size() > 0) 
     { 
      context.write(NullWritable.get(), new Text(key.toString() + " " + pq.poll().toString())); 
      //break; // only ouput the first record 
     } 

    } // reduce 
} 

Это мой вспомогательный класс:

общественного класса NPhase2Value реализует WritableComparable {

private IntWritable first; 
private FloatWritable second; 

public NPhase2Value() { 
    set(new IntWritable(), new FloatWritable()); 
} 

public NPhase2Value(int first, float second) { 
    set(new IntWritable(first), new FloatWritable(second)); 
} 

public void set(IntWritable first, FloatWritable second) { 
    this.first = first; 
    this.second = second; 
} 

public IntWritable getFirst() { 
    return first; 
} 

public FloatWritable getSecond() { 
    return second; 
} 

@Override 
public void write(DataOutput out) throws IOException { 
    first.write(out); 
    second.write(out); 
} 

@Override 
public void readFields(DataInput in) throws IOException { 
    first.readFields(in); 
    second.readFields(in); 
} 

@Override 
public boolean equals(Object o) { 
    if (o instanceof NPhase2Value) { 
     NPhase2Value np2v = (NPhase2Value) o; 
     return first.equals(np2v.first) && second.equals(np2v.second); 
    } 
    return false; 
} 

@Override 
public String toString() { 
    return first.toString() + " " + second.toString(); 
} 

@Override 
public int compareTo(NPhase2Value np2v) { 
    return 1; 
} 

} 

Команда командной строки я использую:

hadoop jar knn.jar NPhase2 -m 1 -r 3 -k 4 phase1out phase2out 

Я пытаюсь трудно понять ошибку но все еще не в состоянии придумать решение. Пожалуйста, помогите мне в этом отношении, поскольку я бегу по жесткому графику.

ответ

0

Потому что вы установили количество редуктора задачи как 0. Смотрите это:

int numberOfPartition = 0; 
//....... 
job.setNumReduceTasks(numberOfPartition * numberOfPartition); 

Я не вижу, что вы сброшены numberOfPartition где-нибудь в вашем коде. Я хочу, чтобы вы установили его, когда вы разбираете параметр -r или удаляете вызов метода setNumReduceTasks, как описано выше, так как вы уже устанавливаете его во время разбора параметра -r.

+0

Как я пропустил это. спасибо всем Алмасу. –

Смежные вопросы