2015-07-05 2 views
0

Я новичок в Hadoop и пишу свою первую программу, чтобы присоединиться к следующим двум таблицам в MapReduce.Стол объединяется в MapReduce - Hadoop

Первая таблица:

11111 John 
22222 Robert 
33333 Stephan 
44444 Peter 
55555 Andersen 

Вторая таблица:

11111 Washington EEE 2011 
22222 Jacksonville EIE 2010 
33333 Minneapolis ECE 2012 
44444 Cheyenne  CSE 2013 
55555 Detroit  IT 2014 

Я загрузил выше двух текстовых файлов в HDFS с помощью Hue. Между каждым столбцом есть вкладка.
После запуска кода, я получаю неожиданный выход следующим образом:

11111 John Washington EEE 2011   
22222 Jacksonville EIE 2010  Robert 
33333 Stephan Minneapolis ECE 2012   
44444 Cheyenne CSE 2013  Peter 
55555 Andersen Detroit  IT 2014 

Я не мог понять, что случилось с моим кодом. Вот мой Java-код:

DriverClass:

public class DriverClass extends Configured{ 

public static void main (String args[]) throws IOException, ClassNotFoundException, InterruptedException{ 

    Job job = new Job(); 

    job.setJarByClass(DriverClass.class); 


    MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, MapperClassOne.class); 
    MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, MapperClassTwo.class); 
    FileOutputFormat.setOutputPath(job, new Path(args[2])); 

    job.setReducerClass(ReducerClass.class); 

    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(Text.class); 

    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 

    System.exit(job.waitForCompletion(true)? 0 : -1); 

} 
} 

MapperClass для моего первого набора данных (первая таблица) - MapperClassOne:

public class MapperClassOne extends Mapper<LongWritable, Text, Text, Text>{ 

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ 

    String[] line = value.toString().split("\t"); 


    context.write(new Text(line[0]), new Text(line[1])); 
} 
} 

MapperClass для моего второго набора данных (вторая таблица) - MapperClassTwo:

public class MapperClassTwo extends Mapper<LongWritable, Text, Text, Text>{ 

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ 

    String[] line = value.toString().split("\t"); 

    String temp = ""; 
    for(int i=1; i<line.length; i++){ 
     temp += line[i] + "\t"; 
    } 

    context.write(new Text(line[0]), new Text(temp)); 
} 
} 

ReducerClass:

public class ReducerClass extends Reducer<Text, Text, Text, Text>{ 

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ 

    Iterator<Text> iter = values.iterator(); 

    String temp = ""; 
    while(iter.hasNext()){ 
     temp += iter.next().toString() + "\t"; 
    } 

    context.write(key, new Text(temp)); 

} 
} 

Пожалуйста, помогите мне, а также предложите мне, если есть лучший способ выполнить соединение таблицы.

ответ

0

В редукторе значения ключа не сортируются, если вы не реализуете вторичную сортировку. При текущей реализации значение для ключа может быть в произвольном порядке. Вам нужно добавить идентификатор к значениям вашего картографа, чтобы определить источник значений для ключа в редукторе.

См: http://kickstarthadoop.blogspot.com/2011/09/joins-with-plain-map-reduce.html http://www.lichun.cc/blog/2012/05/hadoop-genericwritable-sample-usage/

0

Здесь у вас есть карта уменьшить внутреннее соединение примера закодированный в Python вы можете получить вдохновение от:

https://github.com/jpgerek/mapreduce-exercises/blob/master/inner-join-two-tables.py

В том же проекте у вас есть несколько примеров для других алгоритмов.

Они используют модуль python под названием mredu, предназначенный для образовательной цели.

Отказ от ответственности: Я написал код этих примеров.