Вам нужен обычай RecordReader
, чтобы сделать это:
public class TrimmedRecordReader implements RecordReader<LongWritable, Text> {
private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;
public TrimmedRecordReader(JobConf job, FileSplit split) throws IOException {
lineReader = new LineRecordReader(job, split);
lineKey = lineReader.createKey();
lineValue = lineReader.createValue();
}
public boolean next(LongWritable key, Text value) throws IOException {
if (!lineReader.next(lineKey, lineValue)) {
return false;
}
String[] fields = lineValue.toString().split(",");
if (fields.length < 3) {
throw new IOException("Invalid record received");
}
value.set(fields[0] + "," + fields[1] + "," + fields[2]);
return true;
}
public LongWritable createKey() {
return lineReader.createKey();
}
public Text createValue() {
return lineReader.createValue();
}
public long getPos() throws IOException {
return lineReader.getPos();
}
public void close() throws IOException {
lineReader.close();
}
public float getProgress() throws IOException {
return lineReader.getProgress();
}
}
Это должно быть довольно очевидно , просто обернуть LineRecordReader
. К сожалению, для его вызова вам необходимо также расширить InputFormat
. Достаточно:
public class TrimmedTextInputFormat extends FileInputFormat<LongWritable, Text> {
public RecordReader<LongWritable, Text> getRecordReader(InputSplit input,
JobConf job, Reporter reporter) throws IOException {
reporter.setStatus(input.toString());
return new TrimmedRecordReader(job, (FileSplit) input);
}
}
Только не забудьте установить его в драйвере.
какой api u r используемый - обозначенный или новый? – blackSmith