2012-09-14 2 views
0

Я пытаюсь решить следующую проблему RecordReader. Пример входной файл:Hadoop Map-Reduce. RecordReader

1,1 
2,2 
3,3 
4,4 
5,5 
6,6 
7,7 
....... 
....... 

я хочу, чтобы мой RecordReader вернуть

key | Value 
0 |1,1:2,2:3,3:4,4:5,5 
4 |2,2:3,3:......6,6 
6 |3,3:4,4......6,6,7,7 

(для первого значения первых пяти линий, для второго значений пяти линий, начиная с 2-ем линии и по 3-й величине пяти линий, начиная с третья линия и т.д.)

public class MyRecordReader extends RecordReader<LongWritable, Text> {

@Override 
public boolean nextKeyValue() throws IOException, InterruptedException { 

    while (pos < end) { 
     key.set(pos); 
     // five line logic 
     Text nextLine=new Text(); 



     int newSize = in.readLine(value, maxLineLength, 
           Math.max((int)Math.min(Integer.MAX_VALUE, end-pos), 
             maxLineLength)); 
     fileSeek+=newSize; 

     for(int n=0;n<4;n++) 
     { 
      fileSeek+=in.readLine(nextLine, maxLineLength, 
        Math.max((int)Math.min(Integer.MAX_VALUE, end-pos), 
          maxLineLength)); 
      value.append(":".getBytes(), 0,1); 
      value.append(nextLine.getBytes(), 0, nextLine.getLength()); 
     } 
     if (newSize == 0) { 

     return false; 

     } 
     pos += newSize; 
     if (newSize < maxLineLength) { 

     return true; 
     } 

     // line too long. try again 
     LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); 
    } 

    return false; 
} 

}

Но это возвращение значения, как

key | Value 
0 |1,1:2,2:3,3:4,4:5,5 
4 |6,6:7,7.......10,10 
6 |11,11:12,12:......14,14 

может кто-то помочь мне с этим кодом или свежий код для RecodeReader будет делать, а? Requirement of the problem (may help you understand the use case) Благодаря

+2

отформатируйте вопрос правильно и правильно показать выход, тогда мы сможем ответить ... – codeling

+0

@nyarlathotep: sry для плохого формата. il попытаться улучшить его, все еще мог hv помог мне справиться с ans. –

ответ

3

Я думаю, что я понимаю вопрос ... вот что я хотел бы сделать: обернуть другой RecordReader и буфер ключей/значений из него в локальной очереди.

public class MyRecordReader extends RecordReader<LongWritable, Text> { 
    private static final int BUFFER_SIZE = 5; 
    private static final String DELIMITER = ":"; 

    private Queue<String> valueBuffer = new LinkedList<String>(); 
    private Queue<Long> keyBuffer = new LinkedList<Long>(); 
    private LongWritable key = new LongWritable(); 
    private Text value = new Text(); 

    private RecordReader<LongWritable, Text> rr; 
    public MyRecordReader(RecordReader<LongWritable, Text> rr) { 
     this.rr = rr; 
    } 

    @Override 
    public void close() throws IOException { 
     rr.close(); 
    } 

    @Override 
    public LongWritable getCurrentKey() throws IOException, InterruptedException { 
     return key; 
    } 

    @Override 
    public Text getCurrentValue() throws IOException, InterruptedException { 
     return value; 
    } 

    @Override 
    public float getProgress() throws IOException, InterruptedException { 
     return rr.getProgress(); 
    } 

    @Override 
    public void initialize(InputSplit arg0, TaskAttemptContext arg1) 
      throws IOException, InterruptedException { 
     rr.initialize(arg0, arg1); 
    } 

    @Override 
    public boolean nextKeyValue() throws IOException, InterruptedException { 
     if (valueBuffer.isEmpty()) { 
      while (valueBuffer.size() < BUFFER_SIZE) { 
       if (rr.nextKeyValue()) { 
        keyBuffer.add(rr.getCurrentKey().get()); 
        valueBuffer.add(rr.getCurrentValue().toString()); 
       } else { 
        return false; 
       } 
      } 
     } else { 
      if (rr.nextKeyValue()) { 
       keyBuffer.add(rr.getCurrentKey().get()); 
       valueBuffer.add(rr.getCurrentValue().toString()); 
       keyBuffer.remove(); 
       valueBuffer.remove(); 
      } else { 
       return false; 
      } 
     } 
     key.set(keyBuffer.peek()); 
     value.set(getValue()); 
     return true; 
    } 

    private String getValue() { 
     StringBuilder sb = new StringBuilder(); 
     Iterator<String> iter = valueBuffer.iterator(); 
     while (iter.hasNext()) { 
      sb.append(iter.next()); 
      if (iter.hasNext()) sb.append(DELIMITER); 
     } 
     return sb.toString(); 
    } 

} 

Тогда, например, вы можете иметь пользовательские InputFormat, расширяющие TextInputFormat и переопределяет метод createRecordReader для вызова super.createRecordReader и возвращают этот результат, завернутый в MyRecordReader, как это:

public class MyTextInputFormat extends TextInputFormat { 
    @Override 
    public RecordReader<LongWritable, Text> createRecordReader(
       InputSplit arg0, TaskAttemptContext arg1) { 
     return new MyRecordReader(super.createRecordReader(arg0, arg1)); 
    } 
} 
+0

К сожалению, я не тестировал код перед его запуском. Я отредактировал его, попробуйте сейчас. –

+0

Спасибо @joe K: Он работает отлично. –

Смежные вопросы