2015-03-15 2 views
0

Мой набор данных имеет следующий формат:Hadoop не понимает Композитные ключи равны

userID mediaID rating 

я хотел бы, чтобы найти совместное вхождение любой пары mediaIDs, которые получают рейтинг выше порога во всех пользователи. Для этого я выполнил несколько примеров для реализации составного ключа. Я написал класс PairKey, который хранит уникальную пару, реализуемый СотрагеТо и перекрытый хэш-код и составляет ...

public static class PairKey implements WritableComparable<PairKey> { 

    private Integer lowID; 
    private Integer highID; 


    public PairKey() { 

     this.lowID = -1; 
     this.highID = -1; 

    } 

    public PairKey(Integer one, Integer two) { 
     //should be impossible 
     if (one.equals(two)) { 
      throw new IllegalArgumentException("Cannot have a pair key with identical IDs"); 
     } 
     if (one < two) { 
      lowID = one; 
      highID = two; 
     } 
     else { 
      lowID = two; 
      highID = one; 
     } 
    } 

    public Integer getLowID() { 
     return lowID; 
    } 

    public Integer getHighID() { 
     return highID; 
    } 

    public void setLowID(Integer _lowID) { 
     lowID = _lowID; 
    } 

    public void setHighID(Integer _highID) { 
     highID = _highID; 
    } 

    @Override 
    public int compareTo(PairKey other) { 
     int _lowCompare = lowID.compareTo(other.getLowID()); 
     if (_lowCompare != 0) { 
      return _lowCompare; 
     } 
     int _highCompare = highID.compareTo(other.getHighID()); 
     return _highCompare; 
    } 

    @Override 
    public void write(DataOutput dataOutput) throws IOException { 
     dataOutput.writeInt(lowID.intValue()); 
     dataOutput.writeInt(highID.intValue()); 
    } 

    @Override 
    public void readFields(DataInput dataInput) throws IOException { 
     lowID = new Integer(dataInput.readInt()); 
     highID = new Integer(dataInput.readInt()); 
    } 

    @Override 
    public String toString() { 
     return "<" + lowID + ", " + highID + ">"; 
    } 

    @Override 
    public boolean equals(Object o) { 

     if (this == o) { 
      return true; 
     } 
     if (o == null || this.getClass() != o.getClass()) { 
      return false; 
     } 

     PairKey other = (PairKey) o; 

     //compare fields 
     if (this.lowID != null ? this.lowID.equals(other.getLowID()) == false : other.getLowID() != null) return false; 
     if (this.highID != null ? this.highID.equals(other.getHighID()) == false : other.getHighID() != null) return false; 

     return true; 
    } 


    @Override 
    public int hashCode() { 
     int _lowHash = this.lowID.hashCode(); 
     int _highHash = this.highID.hashCode(); 
     return 163 * (_lowHash) + _highHash; 
    } 
} 

Вот мой код картографа, я храню все movieIDs, которые прошли порог в наборе для каждого пользователя, а затем испускают все возможные пары в этом наборе:

public static class PairMapper extends Mapper<Text, Text, PairKey, IntWritable> { 

    private Map<Integer, SortedSet<Integer>> temp = new HashMap<Integer, SortedSet<Integer>>(); 
    private IntWritable one = new IntWritable(1); 
    private PairKey _key = new PairKey(); 

    public void map(Text key, Text value, Context context) throws IOException, InterruptedException { 
     Integer userID = new Integer(key.toString()); 
     String[] vals = value.toString().split("\t"); 
     String _movieID = vals[0]; 
     String _rating = vals[1]; 
     Integer movieID = new Integer(_movieID); 
     Integer rating = new Integer(_rating); 
     if (rating > 3) { 
      SortedSet candidates = temp.get(userID); 
      if (candidates == null) { 
       candidates = new TreeSet<Integer>(); 
      } 
      candidates.add(movieID); 
      temp.put(userID, candidates); 

     } 
    }//map 

    public void cleanup(Context context) throws IOException, InterruptedException { 

     for (Map.Entry<Integer, SortedSet<Integer>> e : temp.entrySet()) { 

      SortedSet<Integer> _set = e.getValue(); 
      Integer [] arr = _set.toArray(new Integer[_set.size()]); 
      for (int i = 0 ; i < arr.length-1 ; i++) { 
       for (int j = i+1 ; j < arr.length ; j++) { 
        _key.setLowID(arr[i]); 
        _key.setHighID(arr[j]); 
        context.write(_key, one); 
       }//for j 

      }//for i 




     } 



    }//cleanup 



}//PairMapper 

Это мой редуктор:

public static class PairReducer extends Reducer<PairKey, Iterable<IntWritable>, Text, IntWritable> { 

    public void reduce(PairKey key, Iterable<IntWritable> vals, Context context) throws IOException, InterruptedException { 
     int sum = 0; 
     for (IntWritable val : vals) { 
      sum+= val.get(); 
     }//for 
     IntWritable result = new IntWritable(sum); 
     context.write(new Text(key.toString()), result); 
    } //reduce 

} 

и это мой водитель основной метод:

public static void main(String[] args) throws Exception { 
    Configuration conf = new Configuration(); 
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 

    if (otherArgs.length != 2) { 
     System.err.println("Usage: moviepairs <in> <out>"); 
     System.exit(2); 
    } 

    //CONFIGURE THE JOB 
    Job job = new Job(conf, "movie pairs"); 

    job.setJarByClass(MoviePairs.class); 

    job.setSortComparatorClass(CompositeKeyComparator.class); 
    job.setPartitionerClass(NaturalKeyPartitioner.class); 
    job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class); 

    //map-reduce classes 
    job.setMapperClass(PairMapper.class); 
    job.setCombinerClass(PairReducer.class); 
    job.setReducerClass(PairReducer.class); 


    //key-val classes 
    job.setMapOutputKeyClass(PairKey.class); 
    job.setMapOutputValueClass(IntWritable.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 


    job.setInputFormatClass(KeyValueTextInputFormat.class); 
    FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 

    System.exit(job.waitForCompletion(true)? 0 :1); 

} 

Я ожидаю, чтобы получить это в моем редукторе:

pair <1,2>: [1,1,1] 

, но вместо того, чтобы редуктор не кажется, понимают равенство пар. Вместо этого:

pair<1,2>: [1] 
pair<1,2>: [1] 
pair<1,2>: [1] 

Не уверен, что мне там не хватает. Как вы можете видеть, я пробовал несколько вещей, таких как добавление настраиваемого сортировщика (который, как я полагаю, мне не нужен, и использование компаратора группировки, пользовательский разделитель), но я думаю, что просто переопределить hashcode/equals следует учитывать это? (не уверен). Все примеры, которые я нашел в Интернете, похоже, следуют этому, и все они, похоже, работают.

ответ

0

Как обычно с этими вопросами, проблема была совершенно неактуальной. Я испортил интерфейс Reducer. Вместо <KEYIN, VALIN...> Я делал <KEYIN, ITERABLE<VALIN>....>