2013-12-20 5 views
0

Я пытаюсь использовать распределенный кеш-память, чтобы сохранить два источника ввода с картой.Распределенный кеш Hadoop

Итак, я делаю прототип, который объединяет два входных файла для использования распределенного кеша, и эта проблема успешно работает.

Однако распределенный кеш-ави не работает, если я пишу программу, которая воплощает несколько заданий для создания карт, а в программе вывод предыдущего задания используется как один из двух входных файлов в следующем задании. Однако распределенный кешированный файл ничего не испускает.

Вот мой водитель задания.

public int run(String[] args) throws Exception { 
    Path InputPath = new Path(args[0]); 
    Path Inter = new Path("Inters") ;//new Path(args[1]); 
    Path OutputPath = new Path(args[1]);   

    JobConf conf = new JobConf(getConf(), Temp.class); 
    FileSystem fs = FileSystem.get(getConf()); 
    conf.setJobName("wordcount"); 

    conf.setOutputKeyClass(Text.class); 
    conf.setOutputValueClass(IntWritable.class); 

    conf.setMapperClass(FirstMap.class); 
    //conf.setCombinerClass(Reduce.class); 
    conf.setReducerClass(Reduce.class); 

    conf.setMapOutputKeyClass(Text.class); 
    conf.setMapOutputValueClass(IntWritable.class); 
    conf.setInputFormat(TextInputFormat.class); 
    conf.setOutputFormat(TextOutputFormat.class); 
    //conf.setNumReduceTasks(0); 


    //20131220 - to deal with paths as variables 



    //fs.delete(Inter); 

    //DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf); 
    FileInputFormat.setInputPaths(conf, InputPath); 
    FileOutputFormat.setOutputPath(conf, Inter); 
    conf.set("threshold", args[2]); 
    JobClient.runJob(conf); 


    // start job 2 

    JobConf conf2 = new JobConf(getConf(), Temp.class); 
    conf2.setJobName("shit"); 

    conf2.setMapOutputKeyClass(Text.class); 
    conf2.setMapOutputValueClass(IntWritable.class); 

    conf2.setOutputKeyClass(Text.class); 
    conf2.setOutputValueClass(IntWritable.class); 

    conf2.setMapperClass(Map.class); 
    //conf.setCombinerClass(Reduce.class); 
    conf2.setReducerClass(Reduce.class); 
    conf2.setNumReduceTasks(0); 
    conf2.setInputFormat(TextInputFormat.class); 
    conf2.setOutputFormat(TextOutputFormat.class); 


    //DistributedCache.addFileToClassPath(Inter, conf2); 
    //DistributedCache.addCacheFile(Inter.toUri(), conf2); 
    String InterToStroing = Inter.toString(); 
    Path Inters = new Path(InterToStroing); 

    DistributedCache.addCacheFile(new Path(args[3]).toUri(), conf2); 
    FileInputFormat.setInputPaths(conf2, InputPath); 
    FileOutputFormat.setOutputPath(conf2, OutputPath); 

    conf2.set("threshold", "0"); 
    JobClient.runJob(conf2); 

    return 0; 
} 

Также здесь представлена ​​функция карты, которая имеет дело с распределенным кешем.

public static class Map extends MapReduceBase implements 
     Mapper<LongWritable, Text, Text, IntWritable> { 

    static enum Counters { 
     INPUT_WORDS 
    } 

    private final static IntWritable one = new IntWritable(1); 
    private Text word = new Text(); 

    private boolean caseSensitive = true; 
    private Set<String> patternsToSkip = new HashSet<String>(); 

    private long numRecords = 0; 
    private String inputFile; 
    private Iterator<String> Iterator; 

    private Path[] localFiles; 
    public void configure (JobConf job) { 
     try { 
      localFiles = DistributedCache.getLocalCacheFiles(job); 
     } catch (IOException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
     for (Path patternsFile : localFiles) { 
      parseSkipFile(patternsFile); 
     } 
    } 
    private void parseSkipFile(Path patternsFile) { 
     try { 
      BufferedReader fis = new BufferedReader(new FileReader(
        patternsFile.toString())); 
      String pattern = null; 
      while ((pattern = fis.readLine()) != null) { 
       //String [] StrArr = pattern.split(" "); 
       System.err.println("Pattern : " + pattern); 
       patternsToSkip.add(pattern); 
      } 
     } catch (IOException ioe) { 
      System.err 
        .println("Caught exception while parsing the cached file '" 
          + patternsFile 
          + "' : " 
          + StringUtils.stringifyException(ioe)); 
     } 
    } 

    public void map(LongWritable key, Text value, 
      OutputCollector<Text, IntWritable> output, Reporter reporter) 
      throws IOException { 
     //output.collect(value, one); 


     ArrayList<String> temp = new ArrayList<String>(); 

     String line = value.toString(); 

     Iterator = patternsToSkip.iterator(); 


     while (Iterator.hasNext()) { 
      output.collect(new Text(Iterator.next()+"+"+value.toString()),one); 
     } 
     /*while (Iterator.hasNext()) { 
      output.collect(new Text(Iterator.next().toString()), one); 
     }*/ 
     //output.collect(value, one); 


    } 
} 

Любой, кто имел дело с этой проблемой?

ответ

1

Вот что я сделал, чтобы практиковать хаос. Он содержит несколько входных данных, а также работу по цепочке, что уменьшает боковое соединение в университетской компьютерной лаборатории.

public class StockJoinJob extends Configured { 

public static class KeyPartitioner extends Partitioner<TextIntPair, TextLongIntPair> { 
@Override 
public int getPartition(TextIntPair key, TextLongIntPair value, int numPartitions) { 
    return (key.getText().hashCode() & Integer.MAX_VALUE) % numPartitions; 
} 
} 

public static int runJob(String[] args) throws Exception { 
     Configuration conf = new Configuration(); 
     Job job = new Job(conf); 
    job.setJarByClass(StockJoinJob.class); 

    Path nasdaqPath = new Path(args[0]); 
    Path listPath = new Path(args[1]); 
    Path outputPath = new Path(args[2]+"-first"); 

    MultipleInputs.addInputPath(job, listPath, TextInputFormat.class, CompanyMapper.class); 
    MultipleInputs.addInputPath(job, nasdaqPath, 
    StockInputFormat.class, StockMapper.class); 
    FileOutputFormat.setOutputPath(job, outputPath); 

    job.setPartitionerClass(KeyPartitioner.class); 
    job.setGroupingComparatorClass(TextIntPair.FirstComparator.class); 

    job.setMapOutputKeyClass(TextIntPair.class); 
    job.setMapOutputValueClass(TextLongIntPair.class); 
    job.setReducerClass(JoinReducer.class); 

    job.setOutputKeyClass(TextIntPair.class); 
    job.setOutputValueClass(TextLongPair.class); 

    return job.waitForCompletion(true) ? 0 : 1; 
    } 

    public static int runJob2(String[] args) throws Exception { 
    //need first comparator like previous job 
    Configuration conf = new Configuration(); 
     Job job = new Job(conf); 

    job.setJarByClass(StockJoinJob.class); 
    job.setReducerClass(TotalReducer.class); 
     job.setMapperClass(TotalMapper.class); 
    Path firstPath = new Path(args[2]+"-first"); 
    Path outputPath = new Path(args[2]+"-second"); 

    //reducer output// 
    job.setOutputKeyClass(TextIntPair.class); 
    job.setOutputValueClass(TextLongPair.class); 

    //mapper output// 
    job.setMapOutputKeyClass(TextIntPair.class); 
    job.setMapOutputValueClass(TextIntPair.class);  

    //etc    
    FileInputFormat.setInputPaths(job, firstPath); 
    FileOutputFormat.setOutputPath(job, outputPath); 
    outputPath.getFileSystem(conf).delete(outputPath, true); 

    return job.waitForCompletion(true) ? 0 : 1; 
    } 



public static void main(String[] args) throws Exception { 
int firstCode = runJob(args); 
if(firstCode==0){ 
int secondCode =runJob2(args); 
    System.exit(secondCode); 
} 


} 
} 
0

Я не уверен, в чем проблема (возможно, вы должны перефразировать ее), но я бы посоветовал вам прочитать Yahoo tutorial on Chaining Jobs. Я вижу два варианта здесь:

  • Если вы точно такую ​​же карту и не заботиться о порядке исполнения (другими словами, два задания могут быть выполнены параллельно), я предложил бы создать одну единственную задание с двумя путями ввода. Вы можете сделать это с помощью команды:

    FileInputFormat.setInputPaths(conf, new Path(args[0])); FileInputFormat.addInputPath(conf, new Path(args[1]));

  • Я думаю, вам нужно добавить два отдельных драйвера работы в новом драйвере «цепи», а затем добавить зависимости (например, вторая работа зависит от первый и, следовательно, должен выполняться при завершении первого). Затем распределенный кеш может быть объявлен в драйвере второго задания. Надеюсь, это поможет ...

Смежные вопросы