2010-04-26 2 views
6

Я пытался использовать Hadoop для отправки N количества строк в одно сопоставление. Я не требую, чтобы линии были разделены.Несколько строк текста на одну карту

Я попытался использовать NLineInputFormat, однако это отправляет N строк текста из данных в каждый картограф по одной строке за раз [отказ после N-й строки].

Я попытался установить опцию и он принимает только N строк ввода его отправкой на 1 линии, в то время на каждой карте:

job.setInt("mapred.line.input.format.linespermap", 10); 

я нашел список рассылки рекомендовать мне переопределить LineRecordReader :: next, однако это не так просто, так как внутренние члены данных являются частными.

Я только что проверил источник для NLineInputFormat и это жесткие коды LineReader, поэтому переопределение не поможет.

Кроме того, кстати, я использую Hadoop 0.18 для совместимости с Amazon EC2 MapReduce.

+0

Почему вы пытаетесь это сделать? Разве несколько строк составляют одну запись в некотором смысле? –

+0

Мне действительно нужно N число случайных строк [как набор], однако я могу жить с последовательными. Мне нужно его, чтобы отправить его на правильный редуктор. – monksy

+0

Чтобы ответить на ваш вопрос, да, они делают. – monksy

ответ

7

Вы должны реализовать свой собственный формат ввода. Тогда у вас также есть возможность определить свой собственный записывающий ридер.

К сожалению, вам необходимо определить метод getSplits(). По-моему, это будет сложнее, чем внедрение устройства чтения записей. Этот метод должен реализовать логику для ввода входных данных.

Смотрите следующий отрывок из "Hadoop - полное руководство" (большой книги, которую я всегда рекомендую!):

Вот интерфейс:

public interface InputFormat<K, V> { 
    InputSplit[] getSplits(JobConf job, int numSplits) throws IOException; 
    RecordReader<K, V> getRecordReader(InputSplit split, 
            JobConf job, 
            Reporter reporter) throws IOException; 
} 

JobClient называет getSplits (метод) , передавая необходимое количество задач карты в качестве аргумента numSplits. Это число рассматривается как подсказка, так как InputFormat imple- ментаций могут возвращать разное количество разделов на число, указанное в numSplits. Вычисляя разрывы, клиент отправляет их в jobtracker, который использует свои места хранения для планирования задач карты для обработки их на контролерах.

На задачах слежения задача карты передает разделение на метод getRecordReader() на InputFormat для получения RecordReader для этого разделения. RecordReader - это немного больше, чем итератор по записям, а задание карты использует его для генерации пар значений ключа ключа, , которые он передает в функцию карты. Фрагмент кода (на основе коды в MapRunner) иллюстрирует идею:

K key = reader.createKey(); 
V value = reader.createValue(); 
while (reader.next(key, value)) { 
    mapper.map(key, value, output, reporter); 
} 
+0

Этот вид работы. Но это действительно не отвечает на вопрос. Существует проблема с добавлением новых InputFormats под 18.3. – monksy

+2

Хорошо, прошу прощения. На самом деле нет реального вопроса, так как я не вижу знака вопроса: -P Так что еще вам нужно знать более конкретно? –

1

Я думаю, что в вашем случае вы можете следовать схеме делегирования и реализации обертки вокруг LineRecordReader, отменяющий необходимые методы, то есть следующие() (или nextKeyValue() в новом API), чтобы установить значение для конкатенации N строк, а не одной строки.

Я искал пример реализации ParagraphRecordReader, который использует LineRecordReader для чтения входных данных по строкам (и конкатенации) до тех пор, пока не встретит EOF или пустую строку. Затем он возвращает пару, где значение - это абзац (вместо одной строки). Более того, ParagraphInputFormat для этого ParagraphRecordReader прост, как стандартный TextInputFormat.

Вы можете найти необходимые ссылки на эту реализацию и пару слов об этом следующем сообщении: http://hadoop-mapreduce.blogspot.com/2011/03/little-more-complicated-recordreaders.html.

Лучшие

2

Я решил эту проблему недавно, просто создавая свой собственный InputFormat, который переопределяет NLineInputFormat и реализует пользовательский MultiLineRecordReader вместо LineReader по умолчанию.

Я выбрал расширение NLineInputFormat, потому что я хотел иметь ту же гарантию, что у вас есть ровно N строк на расщепление.

Этот читатель записи берется почти как от http://bigdatacircus.com/2012/08/01/wordcount-with-custom-record-reader-of-textinputformat/

Единственное, что я модифицированными это свойство для maxLineLength что теперь использует новый API, и значение для NLINESTOPROCESS, который получает читать setNumLinesPerSplit() INSEAD NLineInputFormat о будучи жёстко (для большей гибкости).

Вот результат:

public class MultiLineInputFormat extends NLineInputFormat{ 
    @Override 
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) { 
     context.setStatus(genericSplit.toString()); 
     return new MultiLineRecordReader(); 
    } 

    public static class MultiLineRecordReader extends RecordReader<LongWritable, Text>{ 
     private int NLINESTOPROCESS; 
     private LineReader in; 
     private LongWritable key; 
     private Text value = new Text(); 
     private long start =0; 
     private long end =0; 
     private long pos =0; 
     private int maxLineLength; 

     @Override 
     public void close() throws IOException { 
      if (in != null) { 
       in.close(); 
      } 
     } 

     @Override 
     public LongWritable getCurrentKey() throws IOException,InterruptedException { 
      return key; 
     } 

     @Override 
     public Text getCurrentValue() throws IOException, InterruptedException { 
      return value; 
     } 

     @Override 
     public float getProgress() throws IOException, InterruptedException { 
      if (start == end) { 
       return 0.0f; 
      } 
      else { 
       return Math.min(1.0f, (pos - start)/(float)(end - start)); 
      } 
     } 

     @Override 
     public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException, InterruptedException { 
      NLINESTOPROCESS = getNumLinesPerSplit(context); 
      FileSplit split = (FileSplit) genericSplit; 
      final Path file = split.getPath(); 
      Configuration conf = context.getConfiguration(); 
      this.maxLineLength = conf.getInt("mapreduce.input.linerecordreader.line.maxlength",Integer.MAX_VALUE); 
      FileSystem fs = file.getFileSystem(conf); 
      start = split.getStart(); 
      end= start + split.getLength(); 
      boolean skipFirstLine = false; 
      FSDataInputStream filein = fs.open(split.getPath()); 

      if (start != 0){ 
       skipFirstLine = true; 
       --start; 
       filein.seek(start); 
      } 
      in = new LineReader(filein,conf); 
      if(skipFirstLine){ 
       start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start)); 
      } 
      this.pos = start; 
     } 

     @Override 
     public boolean nextKeyValue() throws IOException, InterruptedException { 
      if (key == null) { 
       key = new LongWritable(); 
      } 
      key.set(pos); 
      if (value == null) { 
       value = new Text(); 
      } 
      value.clear(); 
      final Text endline = new Text("\n"); 
      int newSize = 0; 
      for(int i=0;i<NLINESTOPROCESS;i++){ 
       Text v = new Text(); 
       while (pos < end) { 
        newSize = in.readLine(v, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength)); 
        value.append(v.getBytes(),0, v.getLength()); 
        value.append(endline.getBytes(),0, endline.getLength()); 
        if (newSize == 0) { 
         break; 
        } 
        pos += newSize; 
        if (newSize < maxLineLength) { 
         break; 
        } 
       } 
      } 
      if (newSize == 0) { 
       key = null; 
       value = null; 
       return false; 
      } else { 
       return true; 
      } 
     } 
    } 

} 
Смежные вопросы