Мой набор данных имеет следующий формат:Hadoop не понимает Композитные ключи равны
userID mediaID rating
я хотел бы, чтобы найти совместное вхождение любой пары mediaIDs, которые получают рейтинг выше порога во всех пользователи. Для этого я выполнил несколько примеров для реализации составного ключа. Я написал класс PairKey, который хранит уникальную пару, реализуемый СотрагеТо и перекрытый хэш-код и составляет ...
public static class PairKey implements WritableComparable<PairKey> {
private Integer lowID;
private Integer highID;
public PairKey() {
this.lowID = -1;
this.highID = -1;
}
public PairKey(Integer one, Integer two) {
//should be impossible
if (one.equals(two)) {
throw new IllegalArgumentException("Cannot have a pair key with identical IDs");
}
if (one < two) {
lowID = one;
highID = two;
}
else {
lowID = two;
highID = one;
}
}
public Integer getLowID() {
return lowID;
}
public Integer getHighID() {
return highID;
}
public void setLowID(Integer _lowID) {
lowID = _lowID;
}
public void setHighID(Integer _highID) {
highID = _highID;
}
@Override
public int compareTo(PairKey other) {
int _lowCompare = lowID.compareTo(other.getLowID());
if (_lowCompare != 0) {
return _lowCompare;
}
int _highCompare = highID.compareTo(other.getHighID());
return _highCompare;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(lowID.intValue());
dataOutput.writeInt(highID.intValue());
}
@Override
public void readFields(DataInput dataInput) throws IOException {
lowID = new Integer(dataInput.readInt());
highID = new Integer(dataInput.readInt());
}
@Override
public String toString() {
return "<" + lowID + ", " + highID + ">";
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || this.getClass() != o.getClass()) {
return false;
}
PairKey other = (PairKey) o;
//compare fields
if (this.lowID != null ? this.lowID.equals(other.getLowID()) == false : other.getLowID() != null) return false;
if (this.highID != null ? this.highID.equals(other.getHighID()) == false : other.getHighID() != null) return false;
return true;
}
@Override
public int hashCode() {
int _lowHash = this.lowID.hashCode();
int _highHash = this.highID.hashCode();
return 163 * (_lowHash) + _highHash;
}
}
Вот мой код картографа, я храню все movieIDs, которые прошли порог в наборе для каждого пользователя, а затем испускают все возможные пары в этом наборе:
public static class PairMapper extends Mapper<Text, Text, PairKey, IntWritable> {
private Map<Integer, SortedSet<Integer>> temp = new HashMap<Integer, SortedSet<Integer>>();
private IntWritable one = new IntWritable(1);
private PairKey _key = new PairKey();
public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
Integer userID = new Integer(key.toString());
String[] vals = value.toString().split("\t");
String _movieID = vals[0];
String _rating = vals[1];
Integer movieID = new Integer(_movieID);
Integer rating = new Integer(_rating);
if (rating > 3) {
SortedSet candidates = temp.get(userID);
if (candidates == null) {
candidates = new TreeSet<Integer>();
}
candidates.add(movieID);
temp.put(userID, candidates);
}
}//map
public void cleanup(Context context) throws IOException, InterruptedException {
for (Map.Entry<Integer, SortedSet<Integer>> e : temp.entrySet()) {
SortedSet<Integer> _set = e.getValue();
Integer [] arr = _set.toArray(new Integer[_set.size()]);
for (int i = 0 ; i < arr.length-1 ; i++) {
for (int j = i+1 ; j < arr.length ; j++) {
_key.setLowID(arr[i]);
_key.setHighID(arr[j]);
context.write(_key, one);
}//for j
}//for i
}
}//cleanup
}//PairMapper
Это мой редуктор:
public static class PairReducer extends Reducer<PairKey, Iterable<IntWritable>, Text, IntWritable> {
public void reduce(PairKey key, Iterable<IntWritable> vals, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : vals) {
sum+= val.get();
}//for
IntWritable result = new IntWritable(sum);
context.write(new Text(key.toString()), result);
} //reduce
}
и это мой водитель основной метод:
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: moviepairs <in> <out>");
System.exit(2);
}
//CONFIGURE THE JOB
Job job = new Job(conf, "movie pairs");
job.setJarByClass(MoviePairs.class);
job.setSortComparatorClass(CompositeKeyComparator.class);
job.setPartitionerClass(NaturalKeyPartitioner.class);
job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
//map-reduce classes
job.setMapperClass(PairMapper.class);
job.setCombinerClass(PairReducer.class);
job.setReducerClass(PairReducer.class);
//key-val classes
job.setMapOutputKeyClass(PairKey.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)? 0 :1);
}
Я ожидаю, чтобы получить это в моем редукторе:
pair <1,2>: [1,1,1]
, но вместо того, чтобы редуктор не кажется, понимают равенство пар. Вместо этого:
pair<1,2>: [1]
pair<1,2>: [1]
pair<1,2>: [1]
Не уверен, что мне там не хватает. Как вы можете видеть, я пробовал несколько вещей, таких как добавление настраиваемого сортировщика (который, как я полагаю, мне не нужен, и использование компаратора группировки, пользовательский разделитель), но я думаю, что просто переопределить hashcode/equals следует учитывать это? (не уверен). Все примеры, которые я нашел в Интернете, похоже, следуют этому, и все они, похоже, работают.