2012-01-05 2 views
15

У меня есть обычный текстовый файл с возможными миллионами строк, для которого требуется индивидуальный синтаксический анализ, и я хочу как можно быстрее загрузить его в таблицу HBase (используя клиент Hadoop или HBase Java).Что является самым быстрым способом массовой загрузки данных в HBase программно?

Мое настоящее решение основано на MapReduce Работа без части Уменьшить. Я использую FileInputFormat для чтения текстового файла, чтобы каждая строка передавалась методу map моего класса Mapper. В этот момент линия анализируется с образованием объекта Put, который записывается в context. Затем TableOutputFormat принимает объект Put и вставляет его в таблицу.

Это решение дает среднюю скорость ввода 1000 рядов в секунду, что меньше, чем я ожидал. Моя настройка HBase находится в псевдораспределенном режиме на одном сервере.

Интересно, что при введении 1 000 000 строк 25 экземпляров (задач) порождаются, но они запускаются последовательно (один за другим); это нормально?

Вот код для моего текущего решения:

public static class CustomMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { 

    protected void map(LongWritable key, Text value, Context context) throws IOException { 
     Map<String, String> parsedLine = parseLine(value.toString()); 

     Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1]))); 
     for (String currentKey : parsedLine.keySet()) { 
      row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey))); 
     } 

     try { 
      context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 
} 

public int run(String[] args) throws Exception { 
    if (args.length != 2) { 
     return -1; 
    } 

    conf.set("hbase.mapred.outputtable", args[1]); 

    // I got these conf parameters from a presentation about Bulk Load 
    conf.set("hbase.hstore.blockingStoreFiles", "25"); 
    conf.set("hbase.hregion.memstore.block.multiplier", "8"); 
    conf.set("hbase.regionserver.handler.count", "30"); 
    conf.set("hbase.regions.percheckin", "30"); 
    conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3"); 
    conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15"); 

    Job job = new Job(conf); 
    job.setJarByClass(BulkLoadMapReduce.class); 
    job.setJobName(NAME); 
    TextInputFormat.setInputPaths(job, new Path(args[0])); 
    job.setInputFormatClass(TextInputFormat.class); 
    job.setMapperClass(CustomMap.class); 
    job.setOutputKeyClass(ImmutableBytesWritable.class); 
    job.setOutputValueClass(Put.class); 
    job.setNumReduceTasks(0); 
    job.setOutputFormatClass(TableOutputFormat.class); 

    job.waitForCompletion(true); 
    return 0; 
} 

public static void main(String[] args) throws Exception { 
    Long startTime = Calendar.getInstance().getTimeInMillis(); 
    System.out.println("Start time : " + startTime); 

    int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args); 

    Long endTime = Calendar.getInstance().getTimeInMillis(); 
    System.out.println("End time : " + endTime); 
    System.out.println("Duration milliseconds: " + (endTime-startTime)); 

    System.exit(errCode); 
} 
+0

Я предполагаю, что вы хотели, чтобы ваш титул был «массовым», а не «bluk load» ... но дайте мне знать, была ли неправильная коррекция. :-) –

+0

Вы читали это? http://hbase.apache.org/docs/r0.89.20100621/bulk-loads.html –

+0

Также, вы предварительно разделили свои регионы? Если нет, у вас в основном есть однопоточный писатель, который бы объяснил это. Вы в основном получаете одного автора за регион. –

ответ

16

Я прошел через процесс, который, вероятно, очень похожи на ваши пытать найти эффективный способ загрузки данных из MR в HBase. То, что я нашел для работы, использует HFileOutputFormat как OutputFormatClass MR.

Ниже приводится код моего кода, который я должен создать job и функцию Mapper map, которая записывает данные. Это было быстро. Мы больше не используем его, поэтому у меня нет номеров на руках, но это было около 2,5 миллионов записей за минуту.

Здесь (урезанный) функция я написал, чтобы создать работу для моего процесса MapReduce поместить данные в HBase

private Job createCubeJob(...) { 
    //Build and Configure Job 
    Job job = new Job(conf); 
    job.setJobName(jobName); 
    job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
    job.setMapOutputValueClass(Put.class); 
    job.setMapperClass(HiveToHBaseMapper.class);//Custom Mapper 
    job.setJarByClass(CubeBuilderDriver.class); 
    job.setInputFormatClass(TextInputFormat.class); 
    job.setOutputFormatClass(HFileOutputFormat.class); 

    TextInputFormat.setInputPaths(job, hiveOutputDir); 
    HFileOutputFormat.setOutputPath(job, cubeOutputPath); 

    Configuration hConf = HBaseConfiguration.create(conf); 
    hConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum); 
    hConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClientPort); 

    HTable hTable = new HTable(hConf, tableName); 

    HFileOutputFormat.configureIncrementalLoad(job, hTable); 
    return job; 
} 

Это моя карта функции от HiveToHBaseMapper класса (слегка отредактированный).

public void map(WritableComparable key, Writable val, Context context) 
     throws IOException, InterruptedException { 
    try{ 
     Configuration config = context.getConfiguration(); 
     String[] strs = val.toString().split(Constants.HIVE_RECORD_COLUMN_SEPARATOR); 
     String family = config.get(Constants.CUBEBUILDER_CONFIGURATION_FAMILY); 
     String column = strs[COLUMN_INDEX]; 
     String Value = strs[VALUE_INDEX]; 
     String sKey = generateKey(strs, config); 
     byte[] bKey = Bytes.toBytes(sKey); 
     Put put = new Put(bKey); 
     put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) 
         ? Bytes.toBytes(Double.MIN_VALUE) 
         : Bytes.toBytes(value)); 

     ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey); 
     context.write(ibKey, put); 

     context.getCounter(CubeBuilderContextCounters.CompletedMapExecutions).increment(1); 
    } 
    catch(Exception e){ 
     context.getCounter(CubeBuilderContextCounters.FailedMapExecutions).increment(1);  
    } 

} 

Я уверен, что это не будет решением Копировать & Паста для вас. Очевидно, что данные, с которыми я работал здесь, не нуждались в какой-либо пользовательской обработке (это было сделано в задании MR до этого). Главное, что я хочу выделить из этого, - HFileOutputFormat. Остальное - всего лишь пример того, как я его использовал. :)
Надеюсь, это поможет вам найти правильный путь к хорошему решению. :

+1

Я пробовал использовать 'HfileOutputFormat' в моем коде, но я продолжаю получать ниже исключения, любые идеи? 'java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put не может быть приведен к org.apache.hadoop.hbase.KeyValue \t в org.apache.hadoop.hbase.mapreduce.HFileOutputFormat $ 1.Запись (HFileOutputFormat.java:82) \t в org.apache.hadoop.mapred.ReduceTask $ NewTrackingRecordWriter.write (ReduceTask.java:508) \t в org.apache.hadoop.mapreduce.TaskInputOutputContext.write (TaskInputOutputContext.java:80) \t at org.apache.hadoop.mapreduce.Reducer.reduce (Reducer.java:156) \t ... ' –

+0

@kramer Больше, чем пытаться« писать »в другом виде, чем ожидалось (отсюда и действие ошибка) не совсем. Нужно будет увидеть код, чтобы сделать снимок. – Nija

+0

Является ли HFileOutputFormat быстрее, чем TableOutputFormat? Учитывая равную ситуацию с разбиением областей. –

0

Интересно то, что при введении 1 000 000 строк 25 экземпляров (задач) порождаются, но они запускаются поочередно (один за другим); это нормально?

mapreduce.tasktracker.map.tasks.maximum параметр, который по умолчанию равен 2, определяет максимальное количество задач, которые могут выполняться параллельно на узле. Если не изменено, вы должны увидеть 2 задачи карты, выполняемые одновременно на каждом узле.

+0

Пробовал, но результат не изменился. –

+0

Где вы указали параметр? Он должен быть указан в файле mapred-site.xml на всех узлах до запуска демонов Hadoop. Проверьте это [документация] (http://wiki.apache.org/hadoop/FAQ#I_see_a_maximum_of_2_maps.2BAC8-reduces_spawned_concurrently_on_each_TaskTracker.2C_how_do_I_increase_that.3F). Как вы это подтвердили? Может быть проверена с помощью Web-консоли JobTracker. –

Смежные вопросы