2012-04-25 2 views
0

Я пытаюсь использовать реплицированное соединение с использованием распределенного кеша как на кластере, так и на интерфейсе karmasphere. Я вставил код ниже. Моя программа не может найти файл в кэш-памятиРепликативное соединение с использованием распределенного кеша в Hadoop 0.20

import java.io.BufferedReader; 
import java.io.FileReader; 
import java.io.IOException; 
import java.util.Hashtable; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.filecache.DistributedCache; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.FileInputFormat; 
import org.apache.hadoop.mapred.FileOutputFormat; 
import org.apache.hadoop.mapred.InputFormat; 
import org.apache.hadoop.mapred.JobClient; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.KeyValueTextInputFormat; 
import org.apache.hadoop.mapred.MapReduceBase; 
import org.apache.hadoop.mapred.Mapper; 
import org.apache.hadoop.mapred.OutputCollector; 
import org.apache.hadoop.mapred.Reporter; 
import org.apache.hadoop.mapred.TextOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 

// A demostration of Hadoop's DistributedCache tool 
// 

public class MapperSideJoinWithDistributedCache extends Configured implements Tool { 
     private final static String inputa = "C:/Users/LopezGG/workspace/Second_join/input1_1" ; 
public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, Text> { 

    private Hashtable<String, String> joinData = new Hashtable<String, String>(); 

    @Override 
    public void configure(JobConf conf) { 
    try { 
     Path [] cacheFiles = DistributedCache.getLocalCacheFiles(conf); 
      System.out.println("ds"+DistributedCache.getLocalCacheFiles(conf)); 
     if (cacheFiles != null && cacheFiles.length > 0) { 
     String line; 
     String[] tokens; 
     BufferedReader joinReader = new BufferedReader(new FileReader(cacheFiles[0].toString())); 

     try { 
      while ((line = joinReader.readLine()) != null) { 
      tokens = line.split(",", 2); 
      joinData.put(tokens[0], tokens[1]); 
     } 
     } finally { 
      joinReader.close(); 
     } 
     } 
     else 
      System.out.println("joinreader not set"); 
    } catch(IOException e) { 
     System.err.println("Exception reading DistributedCache: " + e); 
    } 
    } 

    public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { 
    String joinValue = joinData.get(key.toString()); 
    if (joinValue != null) { 
    output.collect(key,new Text(value.toString() + "," + joinValue)); 
    } 
    } 
} 


public int run(String[] args) throws Exception { 
    Configuration conf = getConf(); 
    JobConf job = new JobConf(conf, MapperSideJoinWithDistributedCache.class); 

    DistributedCache.addCacheFile(new Path(args[0]).toUri(), job); 
    //System.out.println(DistributedCache.addCacheFile(new Path(args[0]).toUri(), conf)); 
    Path in = new Path(args[1]); 
    Path out = new Path(args[2]); 
    FileInputFormat.setInputPaths(job, in); 
    FileOutputFormat.setOutputPath(job, out); 
    job.setJobName("DataJoin with DistributedCache"); 
    job.setMapperClass(MapClass.class); 
    job.setNumReduceTasks(0); 
    job.setInputFormat(KeyValueTextInputFormat.class); 
    job.setOutputFormat(TextOutputFormat.class); 
    job.set("key.value.separator.in.input.line", ","); 
    JobClient.runJob(job); 
    return 0; 
} 

    public static void main(String[] args) throws Exception { 
       long time1= System.currentTimeMillis(); 
       System.out.println(time1); 
     int res = ToolRunner.run(new Configuration(), 
     new MapperSideJoinWithDistributedCache(),args); 
      long time2= System.currentTimeMillis(); 
      System.out.println(time2); 
      System.out.println("millsecs elapsed:"+(time2-time1)); 
     System.exit(res); 

    } 
} 

Ошибки я получаю

O mapred.MapTask: numReduceTasks: 0 
Exception reading DistributedCache: java.io.FileNotFoundException: \tmp\hadoop-LopezGG\mapred\local\archive\-2564469513526622450_-1173562614_1653082827\file\C\Users\LopezGG\workspace\Second_join\input1_1 (The system cannot find the file specified) 
ds[Lorg.apache.hadoop.fs.Path;@366a88bb 
12/04/24 23:15:01 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 
12/04/24 23:15:01 INFO mapred.LocalJobRunner: 

Но задача выполняется до завершения. Coudl кто-то, пожалуйста, помогите мне> я просмотрел другие сообщения и сделал все изменения, но все равно не работает

ответ

0

Должен признаться, что никогда не использовал класс DistributedCache (скорее, я использую опцию -files через GenericOptionsParser) но я не уверен, что DistributedCache автоматически копирует локальные файлы в HDFS до запуска вашей работы.

В то время как я не могу найти какие-либо доказательства этого факта в документации Hadoop, есть упоминание в Pro Hadoop книге, которая упоминает что-то на этот счет:

В ваш случай, сначала скопируйте файл в HDFS, а когда вы вызовете DistributedCache.addCacheFile, передайте URI файла в HDFS и посмотрите, работает ли это для вас

Смежные вопросы