2012-06-16 3 views
8

У меня есть работа с haddop, ее вывод должен быть записан в HBase. Мне действительно не нужен редуктор, тип строки, которую я хотел бы вставить, определяется в Mapper.Hadoop - Запись в HBase непосредственно из Mapper

Как я могу использовать TableOutputFormat для достижения этого? Из всех примеров я видел предположение, что редуктор является тем, который создает Put, и что TableMapper предназначен только для чтения из таблицы HBase.

В моем случае вход HDFS выводится на конкретную таблицу, я не могу найти что-либо в TableMapReduceUtil, что может помочь мне в этом.

Есть ли какой-нибудь пример, который может мне помочь?

BTW, я использую новый Hadoop API

+0

Сколько записей вы пытаетесь вставить? – Gevorg

ответ

1

Вам просто нужно сделать вывод сопоставителя пары. OutputFormat указывает только, как сохранить выходные значения ключа. Это не обязательно означает, что ключевые значения исходят от редуктора. Вы должны были бы сделать что-то подобное в картографа:

... extends TableMapper<ImmutableBytesWritable, Put>() { 
    ... 
    ... 
    context.write(<some key>, <some Put or Delete object>); 
} 
7

Это пример чтения из файла и поместить все строки в Hbase. Этот пример из «Hbase: окончательное руководство», и вы можете найти его в репозитории. Для того, чтобы получить это просто клонировать репо на вашем компьютере:

git clone git://github.com/larsgeorge/hbase-book.git 

В этой книге вы также можете найти все объяснения по поводу кода. Но если что-то непонятно для вас, не стесняйтесь спрашивать.

` public class ImportFromFile { 
    public static final String NAME = "ImportFromFile"; 
    public enum Counters { LINES } 

    static class ImportMapper 
    extends Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> { 
     private byte[] family = null; 
     private byte[] qualifier = null; 

     @Override 
     protected void setup(Context context) 
     throws IOException, InterruptedException { 
     String column = context.getConfiguration().get("conf.column"); 
     byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column)); 
     family = colkey[0]; 
     if (colkey.length > 1) { 
      qualifier = colkey[1]; 
     } 
     } 

     @Override 
     public void map(LongWritable offset, Text line, Context context) 
     throws IOException { 
      try { 
      String lineString = line.toString(); 
      byte[] rowkey = DigestUtils.md5(lineString); 
      Put put = new Put(rowkey); 
      put.add(family, qualifier, Bytes.toBytes(lineString)); 
      context.write(new ImmutableBytesWritable(rowkey), put); 
      context.getCounter(Counters.LINES).increment(1); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
     } 
    } 

    private static CommandLine parseArgs(String[] args) throws ParseException { 
     Options options = new Options(); 
     Option o = new Option("t", "table", true, 
     "table to import into (must exist)"); 
     o.setArgName("table-name"); 
     o.setRequired(true); 
     options.addOption(o); 
     o = new Option("c", "column", true, 
     "column to store row data into (must exist)"); 
     o.setArgName("family:qualifier"); 
     o.setRequired(true); 
     options.addOption(o); 
     o = new Option("i", "input", true, 
     "the directory or file to read from"); 
     o.setArgName("path-in-HDFS"); 
     o.setRequired(true); 
     options.addOption(o); 
     options.addOption("d", "debug", false, "switch on DEBUG log level"); 
     CommandLineParser parser = new PosixParser(); 
     CommandLine cmd = null; 
     try { 
     cmd = parser.parse(options, args); 
     } catch (Exception e) { 
     System.err.println("ERROR: " + e.getMessage() + "\n"); 
     HelpFormatter formatter = new HelpFormatter(); 
     formatter.printHelp(NAME + " ", options, true); 
     System.exit(-1); 
     } 
     return cmd; 
    } 

    public static void main(String[] args) throws Exception { 
     Configuration conf = HBaseConfiguration.create(); 
     String[] otherArgs = 
     new GenericOptionsParser(conf, args).getRemainingArgs(); 
     CommandLine cmd = parseArgs(otherArgs); 
     String table = cmd.getOptionValue("t"); 
     String input = cmd.getOptionValue("i"); 
     String column = cmd.getOptionValue("c"); 
     conf.set("conf.column", column); 
     Job job = new Job(conf, "Import from file " + input + " into table " + table); 

      job.setJarByClass(ImportFromFile.class); 
     job.setMapperClass(ImportMapper.class); 
     job.setOutputFormatClass(TableOutputFormat.class); 
     job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table); 
     job.setOutputKeyClass(ImmutableBytesWritable.class); 
     job.setOutputValueClass(Writable.class); 
     job.setNumReduceTasks(0); 
     FileInputFormat.addInputPath(job, new Path(input)); 
     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
    }` 
+1

Я получаю следующее: «Исключение из контейнера-запуска: org.apache.hadoop.util.Shell $ ExitCodeException». Вы столкнулись с этой проблемой также с помощью кода выше? Я использую Hadoop2.4 и Hbase0.94.18 – Gevorg

Смежные вопросы