Я использую MultipleOutputs в своем редукторе, так как я хочу иметь отдельный файл результатов для каждого ключа, однако каждый из файлов результатов пуст, хотя файл результата по умолчанию part-r-xxxx создан и содержит правильные значения.Пустые выходные файлы с использованием MapReduce MultipleOutputs
Это мой код для JobDriver и Reducer
Главный класс
public static void main(String[] args) throws Exception {
int currentIteration = 0;
int reducerCount, roundCount;
Configuration conf = createConfiguration(currentIteration);
cleanEnvironment(conf);
Job job = new Job(conf, "cfim");
//Input and output format configuration
job.setMapperClass(TransactionsMapper.class);
job.setReducerClass(PatriciaReducer.class);
job.setInputFormatClass(TransactionInputFormat.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
reducerCount = roundCount = Math.floorDiv(getRoundCount(conf), Integer.parseInt(conf.get(MRConstants.mergeFactorSpecifier)));
FileInputFormat.addInputPath(job, new Path("/home/cloudera/datasets/input"));
Path outputPath = new Path(String.format(MRConstants.outputPathFormat, outputDir, currentIteration));
FileOutputFormat.setOutputPath(job, outputPath);
MultipleOutputs.addNamedOutput(job, "key", TextOutputFormat.class, LongWritable.class, Text.class);
job.waitForCompletion(true);
Reducer класса
public class PatriciaReducer extends Reducer<LongWritable, Text, LongWritable, Text> {
private ITreeManager treeManager;
private SerializationManager serializationManager;
private MultipleOutputs<LongWritable, Text> mos;
@Override
protected void setup(Context context) throws IOException ,InterruptedException {
treeManager = new PatriciaTreeManager();
serializationManager = new SerializationManager();
mos = new MultipleOutputs<LongWritable, Text>(context);
}
@Override
protected void reduce(LongWritable key, Iterable<Text> items, Context context)
throws IOException, InterruptedException {
Iterator<Text> patriciaIterator = items.iterator();
PatriciaTree tree = new PatriciaTree();
if (patriciaIterator.hasNext()){
Text input = patriciaIterator.next();
tree = serializationManager.deserializePatriciaTree(input.toString());
}
while(patriciaIterator.hasNext()){
Text input = patriciaIterator.next();
PatriciaTree mergeableTree = serializationManager.deserializePatriciaTree(input.toString());
tree = treeManager.mergeTree(tree, mergeableTree, false);
}
Text outputValue = new Text(serializationManager.serializeAsJson(tree));
mos.write("key", key, outputValue, generateOutputPath(key));
context.write(key, outputValue);
}
@Override
protected void finalize() throws Throwable {
// TODO Auto-generated method stub
super.finalize();
mos.close();
}
private String generateOutputPath(LongWritable key) throws IOException {
String outputPath = String.format("%s-%s", MRConstants.reduceResultValue, key.toString());
return outputPath;
}
}
я делаю что-то не так?
Этот вопрос кажется немного не по теме. Что вы пробовали до сих пор, чтобы решить проблему? Отладка или тестирование определенных сценариев? –
Ну, я заметил, что файлы результатов созданы, однако они пусты, но результата не так, поскольку я использую встроенный выходной формат, я просто попытался найти похожие проблемы в сети. –
Кажется, что вы уже нашли решение самостоятельно. Не забудьте отметить свой ответ в качестве принятого решения. Или подумайте о том, чтобы удалить свой вопрос, если он настолько специфичен, что другие не выиграют от него. –