Я новичок в Hadoop и пишу свою первую программу, чтобы присоединиться к следующим двум таблицам в MapReduce.Стол объединяется в MapReduce - Hadoop
Первая таблица:
11111 John
22222 Robert
33333 Stephan
44444 Peter
55555 Andersen
Вторая таблица:
11111 Washington EEE 2011
22222 Jacksonville EIE 2010
33333 Minneapolis ECE 2012
44444 Cheyenne CSE 2013
55555 Detroit IT 2014
Я загрузил выше двух текстовых файлов в HDFS с помощью Hue. Между каждым столбцом есть вкладка.
После запуска кода, я получаю неожиданный выход следующим образом:
11111 John Washington EEE 2011
22222 Jacksonville EIE 2010 Robert
33333 Stephan Minneapolis ECE 2012
44444 Cheyenne CSE 2013 Peter
55555 Andersen Detroit IT 2014
Я не мог понять, что случилось с моим кодом. Вот мой Java-код:
DriverClass:
public class DriverClass extends Configured{
public static void main (String args[]) throws IOException, ClassNotFoundException, InterruptedException{
Job job = new Job();
job.setJarByClass(DriverClass.class);
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, MapperClassOne.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, MapperClassTwo.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setReducerClass(ReducerClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)? 0 : -1);
}
}
MapperClass для моего первого набора данных (первая таблица) - MapperClassOne:
public class MapperClassOne extends Mapper<LongWritable, Text, Text, Text>{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String[] line = value.toString().split("\t");
context.write(new Text(line[0]), new Text(line[1]));
}
}
MapperClass для моего второго набора данных (вторая таблица) - MapperClassTwo:
public class MapperClassTwo extends Mapper<LongWritable, Text, Text, Text>{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String[] line = value.toString().split("\t");
String temp = "";
for(int i=1; i<line.length; i++){
temp += line[i] + "\t";
}
context.write(new Text(line[0]), new Text(temp));
}
}
ReducerClass:
public class ReducerClass extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
Iterator<Text> iter = values.iterator();
String temp = "";
while(iter.hasNext()){
temp += iter.next().toString() + "\t";
}
context.write(key, new Text(temp));
}
}
Пожалуйста, помогите мне, а также предложите мне, если есть лучший способ выполнить соединение таблицы.