2013-02-08 6 views
1

Привет, У меня возникли проблемы с использованием распределенного кеша Hadoop. Я запускаю Hadoop в одном кластере узлов (http://www.michael-noll.com/tutorials/running-hadoop-on-ubuntu-linux-single-node-cluster/).Проблемы с использованием распределенного кэша Hadoop

Проблема, что мне нужно передать файл каждому картографу, я уже много читал о распределенном кэше Hadoop, но до сих пор у меня не было никаких попыток каждый раз, когда я пытаюсь открыть локальный файл, я получаю «FileNotFoundException», How Я мог быть уверен, что кэш действительно справляется с файлом?

Спасибо за любую помощь

Вот мой код:

package br.ufmg.dcc.bigdata.hadoop;           
    import java.io.IOException;           
    import java.util.*;           
    import org.apache.hadoop.filecache.DistributedCache;           
    import org.apache.hadoop.fs.FSDataInputStream;           
    import org.apache.hadoop.fs.Path;           
    import org.apache.hadoop.fs.FileSystem;           
    import org.apache.hadoop.conf.*;           
    import org.apache.hadoop.io.*;           
    import org.apache.hadoop.mapred.JobConf;           
    import org.apache.hadoop.mapreduce.*;           
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;           
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;           
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;           
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;           
    import br.ufmg.dcc.bigdata.Result;           
    import au.com.bytecode.opencsv.CSVReader;           
    import java.io.BufferedInputStream;           
    import java.io.File;           
    import java.io.FileInputStream;           
    import java.io.FileOutputStream;           
    import java.io.FileReader;           
    import java.io.InputStream;           
    import java.io.ObjectInput;           
    import java.io.ObjectInputStream;           
    import java.io.ObjectOutputStream;           
    import java.io.StringReader;           
    import java.io.InputStreamReader;           
    import java.io.BufferedReader;           
    import java.net.URI;           
    import weka.core.Instances;           
    import weka.classifiers.rules.LAC;           


    public class Ladoop {           

     public static class Map extends Mapper<Text, Text, Text, IntWritable> {           

      //private           
      private final static IntWritable one = new IntWritable(1);           
      private LAC classifier;           
      private Path[] localFiles;           
      private final static Text missesText = new Text("misses");           
      private final static Text hitsText = new Text("hits");           


      protected void setup(Context context) throws IOException, InterruptedException {           


       FileReader teste = new FileReader("dilma_00.lac"); //error in this line           
       classifier = new LAC("/home/hduser/dilma_00.lac"); //There is no problem if I force to read the local file           
      }           



      public void map(Text key, Text value, Context context) throws IOException, InterruptedException {           
       String line = value.toString();           
       try           
       {           
        Result result = this.classifier.distributionForInstance(line.split(" "));           
        context.write(missesText, new IntWritable(result.getMisses()));           
        context.write(hitsText, new IntWritable(result.getHits()));           
       } catch (Exception e) {           
        System.out.println("MAP ERROR");           
        e.printStackTrace();           
       }           
      }           

     }           


     public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {           

      public void reduce(Text key, Iterable<IntWritable> results, Context context)           
        throws IOException, InterruptedException {           
       int value  = 0;           

       for (IntWritable result : results) {           
        value += result.get();           
       }           
       System.out.println(value);           
       context.write(key, new IntWritable(value));           

      }           
     }           

     public static void main(String[] args) throws Exception {           
      Configuration conf = new Configuration();           

      Job job = new Job(conf, "Ladoop");           
      DistributedCache.addCacheFile(new URI("/user/hduser/dilma_00.lac#dilma_00.lac"), conf);  




      DistributedCache.createSymlink(conf);           


      job.setJarByClass(Ladoop.class);           

      job.setOutputKeyClass(Text.class);           
      job.setOutputValueClass(IntWritable.class);           

      job.setMapperClass(Map.class);           
      job.setReducerClass(Reduce.class);           

      job.setInputFormatClass(NonSplittableKeyValueTextInputFormat.class);           
      job.setOutputFormatClass(TextOutputFormat.class);           

      FileInputFormat.addInputPath(job, new Path(args[0]));           
      FileOutputFormat.setOutputPath(job, new Path(args[1]));           

      job.waitForCompletion(true);           
     }           

    } 

Edit: Пытался также с, но не везло.

 protected void setup(Context context) throws IOException, InterruptedException { 
     Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 
     FileInputStream fileStream = new FileInputStream(cacheFiles[0].toString()); 
     classifier = new LAC(cacheFiles[0].toString()); 
    } 

ответ

0

Я не думаю, что вы можете получить доступ к файлам непосредственно после ввода их в DistributedCache, вы должны добавить что-то вроде этого в вашем setup код:

Path[] cacheFiles = context.getLocalCacheFiles(); 
FileInputStream fileStream = new FileInputStream(cacheFiles[0].toString()); 
+0

Не повезло с этим, отредактировал мой вопрос с использованием кода. – Alessandro

Смежные вопросы