2011-01-20 4 views
3

Недавно я начал использовать Hadoop, и у меня возникла проблема при использовании Mapfile в качестве ввода задания MapReduce.Mapfile как вход в задание MapReduce

Следующий рабочий код, пишет простой MapFile, называемый «TestMap» в hdfs, где есть три ключа типа Text и три значения типа BytesWritable.

Здесь содержание TestMap:

$ hadoop fs -text /user/hadoop/TestMap/data 
11/01/20 11:17:58 INFO util.NativeCodeLoader: Loaded the native-hadoop library 
11/01/20 11:17:58 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library 
11/01/20 11:17:58 INFO compress.CodecPool: Got brand-new decompressor 
A 01 
B 02 
C 03 

Вот это программа, которая создает TestMap файле проекта:

import java.io.IOException; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.io.MapFile; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.BytesWritable; 
import org.apache.hadoop.io.SequenceFile; 
import org.apache.hadoop.io.IOUtils; 

public class CreateMap { 

    public static void main(String[] args) throws IOException{ 

     Configuration conf = new Configuration(); 
     FileSystem hdfs = FileSystem.get(conf); 

     Text key = new Text(); 
     BytesWritable value = new BytesWritable(); 
     byte[] data = {1, 2, 3}; 
     String[] strs = {"A", "B", "C"}; 
     int bytesRead; 
     MapFile.Writer writer = null; 

     writer = new MapFile.Writer(conf, hdfs, "TestMap", key.getClass(), value.getClass()); 
     try { 
      for (int i = 0; i < 3; i++) { 
       key.set(strs[i]); 
       value.set(data, i, 1); 
       writer.append(key, value); 
       System.out.println(strs[i] + ":" + data[i] + " added."); 
      } 
     } 
     catch (IOException e) { 
      e.printStackTrace(); 
     } 
     finally { 
      IOUtils.closeStream(writer); 
     } 
    } 
} 

Простой MapReduce задание, которое следует пытается увеличиваться на единицу значения файле проекта :

import java.io.IOException; 
import java.util.Iterator; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.FileInputFormat; 
import org.apache.hadoop.mapred.FileOutputFormat; 
import org.apache.hadoop.mapred.JobClient; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.SequenceFileInputFormat; 
import org.apache.hadoop.mapred.MapReduceBase; 
import org.apache.hadoop.mapred.Mapper; 
import org.apache.hadoop.mapred.OutputCollector; 
import org.apache.hadoop.mapred.Reducer; 
import org.apache.hadoop.mapred.Reporter; 
import org.apache.hadoop.mapred.TextOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 
import org.apache.hadoop.io.BytesWritable; 


public class AddOne extends Configured implements Tool { 

    public static class MapClass extends MapReduceBase 

     implements Mapper<Text, BytesWritable, Text, Text> { 

     public void map(Text key, BytesWritable value, 
         OutputCollector<Text, Text> output, 
         Reporter reporter) throws IOException { 


      byte[] data = value.getBytes(); 
      data[0] += 1; 
      value.set(data, 0, 1); 
      output.collect(key, new Text(value.toString())); 
     } 
    } 

    public static class Reduce extends MapReduceBase 
     implements Reducer<Text, Text, Text, Text> { 

     public void reduce(Text key, Iterator<Text> values, 
          OutputCollector<Text, Text> output, 
          Reporter reporter) throws IOException { 

      output.collect(key, values.next()); 
     } 
    } 

    public int run(String[] args) throws Exception { 
     Configuration conf = getConf(); 

     JobConf job = new JobConf(conf, AddOne.class); 

     Path in = new Path("TestMap"); 
     Path out = new Path("output"); 
     FileInputFormat.setInputPaths(job, in); 
     FileOutputFormat.setOutputPath(job, out); 

     job.setJobName("AddOne"); 
     job.setMapperClass(MapClass.class); 
     job.setReducerClass(Reduce.class); 

     job.setInputFormat(SequenceFileInputFormat.class); 
     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(Text.class); 

     job.setOutputFormat(TextOutputFormat.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 
     job.set("key.value.separator.in.input.line", ":"); 


     JobClient.runJob(job); 

     return 0; 
    } 

    public static void main(String[] args) throws Exception { 
     int res = ToolRunner.run(new Configuration(), new AddOne(), args); 

     System.exit(res); 
    } 
} 

Исключение, которое я получаю:

java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.BytesWritable 
    at AddOne$MapClass.map(AddOne.java:32) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307) 
    at org.apache.hadoop.mapred.Child.main(Child.java:170) 

Я не понимаю, почему Hadoop пытается бросить LongWritable, так как в моем коде я определить интерфейс Mapper правильно (Mapper<Text, BytesWritable, Text, Text>).

Может ли кто-нибудь мне помочь?

Большое спасибо

Лука

ответ

15

Ваша проблема связана с тем, что, несмотря на то, что название говорит ты, MapFile является не файл.

MapFile фактически представляет собой каталог, состоящий из двух файлов: есть файл данных, который представляет собой SequenceFile, содержащий ключи и значения, которые вы записываете в него; однако есть также файл «index», который представляет собой другой SequenceFile, содержащий подпоследовательность ключей вместе с их смещениями как LongWritables; этот индекс загружается в память с помощью MapFile.Reader, чтобы вы могли быстро выполнять двоичный поиск, чтобы найти смещение в файле данных, которое будет иметь нужные вам данные при случайном доступе.

Вы используете старый "org.apache.hadoop.mapred" version of SequenceFileInputFormat. Это не достаточно разумно, чтобы знать, только посмотреть на файл данных, когда вы скажете ему посмотреть на MapFile как на вход; вместо этого он фактически пытается использовать файл данных и индексный файл как обычные входные файлы. Файл данных будет работать правильно, потому что классы согласуются с тем, что вы указали, но индексный файл будет генерировать ClassCastException, потому что значения индексного файла - это все LongWritables.

У вас есть два варианта: вы можете начать использовать "org.apache.hadoop.mapreduce" version of SequenceFileInputFormat (таким образом изменив другие части вашего кода), который достаточно хорошо знает о MapFiles, чтобы просто посмотреть на файл данных; или вместо этого вы можете явно указать файл данных в качестве файла, который вы хотите ввести.

0

Одним из подходов может быть с помощью пользовательского InputFormat с одной записью для всего блока, файла проекта и поиском с помощью ключа от карты()

Смежные вопросы