2015-09-27 2 views
1

Я использую MultipleOutputs в своем редукторе, так как я хочу иметь отдельный файл результатов для каждого ключа, однако каждый из файлов результатов пуст, хотя файл результата по умолчанию part-r-xxxx создан и содержит правильные значения.Пустые выходные файлы с использованием MapReduce MultipleOutputs

Это мой код для JobDriver и Reducer

Главный класс

public static void main(String[] args) throws Exception { 
    int currentIteration = 0; 
    int reducerCount, roundCount; 

    Configuration conf = createConfiguration(currentIteration); 
    cleanEnvironment(conf); 
    Job job = new Job(conf, "cfim"); 

    //Input and output format configuration 
    job.setMapperClass(TransactionsMapper.class); 
    job.setReducerClass(PatriciaReducer.class); 

    job.setInputFormatClass(TransactionInputFormat.class); 
    job.setMapOutputKeyClass(LongWritable.class); 
    job.setMapOutputValueClass(Text.class); 

    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 

    reducerCount = roundCount = Math.floorDiv(getRoundCount(conf), Integer.parseInt(conf.get(MRConstants.mergeFactorSpecifier))); 

    FileInputFormat.addInputPath(job, new Path("/home/cloudera/datasets/input")); 
    Path outputPath = new Path(String.format(MRConstants.outputPathFormat, outputDir, currentIteration)); 
    FileOutputFormat.setOutputPath(job, outputPath); 
    MultipleOutputs.addNamedOutput(job, "key", TextOutputFormat.class, LongWritable.class, Text.class); 

    job.waitForCompletion(true); 

Reducer класса

public class PatriciaReducer extends Reducer<LongWritable, Text, LongWritable, Text> { 

private ITreeManager treeManager; 
private SerializationManager serializationManager; 
private MultipleOutputs<LongWritable, Text> mos; 

@Override 
protected void setup(Context context) throws IOException ,InterruptedException { 
    treeManager = new PatriciaTreeManager(); 
    serializationManager = new SerializationManager(); 
    mos = new MultipleOutputs<LongWritable, Text>(context); 
} 

@Override 
protected void reduce(LongWritable key, Iterable<Text> items, Context context) 
     throws IOException, InterruptedException { 

    Iterator<Text> patriciaIterator = items.iterator(); 
    PatriciaTree tree = new PatriciaTree(); 

    if (patriciaIterator.hasNext()){ 
     Text input = patriciaIterator.next(); 
     tree = serializationManager.deserializePatriciaTree(input.toString()); 
    } 

    while(patriciaIterator.hasNext()){ 
     Text input = patriciaIterator.next(); 
     PatriciaTree mergeableTree = serializationManager.deserializePatriciaTree(input.toString()); 
     tree = treeManager.mergeTree(tree, mergeableTree, false); 
    } 

    Text outputValue = new Text(serializationManager.serializeAsJson(tree)); 
    mos.write("key", key, outputValue, generateOutputPath(key)); 
    context.write(key, outputValue); 
} 

@Override 
protected void finalize() throws Throwable { 
    // TODO Auto-generated method stub 
    super.finalize(); 
    mos.close(); 
} 

private String generateOutputPath(LongWritable key) throws IOException { 
    String outputPath = String.format("%s-%s", MRConstants.reduceResultValue, key.toString()); 
    return outputPath; 
} 

}

я делаю что-то не так?

+0

Этот вопрос кажется немного не по теме. Что вы пробовали до сих пор, чтобы решить проблему? Отладка или тестирование определенных сценариев? –

+0

Ну, я заметил, что файлы результатов созданы, однако они пусты, но результата не так, поскольку я использую встроенный выходной формат, я просто попытался найти похожие проблемы в сети. –

+0

Кажется, что вы уже нашли решение самостоятельно. Не забудьте отметить свой ответ в качестве принятого решения. Или подумайте о том, чтобы удалить свой вопрос, если он настолько специфичен, что другие не выиграют от него. –

ответ

1

Я выяснил, что использовал неправильный метод для закрытия нескольких объектов вывода. После закрытия MultipleOutputs в методе очистки, а не в методе окончательной доработки, все работает отлично

Смежные вопросы