2015-05-10 4 views
0

В течение последних нескольких дней я учил себя Hadoop и пытался реализовать базовые алгоритмы BFS в соответствии с информацией, приведенной в this webpage. Я должен был внести некоторые изменения и дополнения, чтобы скомпилировать код. Я получил следующую ошибку при запуске и даже после отработки часов часов, я не мог решить эту проблему. Может ли кто-нибудь помочь мне в этом?Отладка в Hadoop - MapReduce. Mapper не вызывается?

ОШИБКА:

15/05/11 03:04:20 WARN mapred.LocalJobRunner: job_local934121164_0001 
java.lang.Exception: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522) 
Caused by: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable 
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1072) 
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715) 
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) 
    at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) 
    at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:125) 
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146) 
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
15/05/11 03:04:21 INFO mapreduce.Job: Job job_local934121164_0001 running in uber mode : false 
15/05/11 03:04:21 INFO mapreduce.Job: map 0% reduce 0% 

Что не должно произойти как в картографа, а также редуктор я отслеживаю и тот же ключ, типы значений, как вы можете увидеть ниже. Единственное, что, я думаю, здесь происходит, это то, что мой класс сопоставления не используется, и вместо этого используется значение по умолчанию (которое испускает LongWritable). Я не уверен, что я делаю неправильно здесь.

SearchMapper.java

import java.io.IOException; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Mapper.Context; 

import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.LongWritable; 
public class SearchMapper extends Mapper<Object, Text, Text, Text> { 

    // Types of the input key, input value and the Context object through which 
    // the Mapper communicates with the Hadoop framework 
    public void map(Object key, Text value, Context context, Node inNode) 
      throws IOException, InterruptedException { 

     // For each GRAY node, emit each of the adjacent nodes as a new node 
     // (also GRAY) if the adjacent node is already processed 
     // and colored BLACK, the reducer retains the color BLACK 
     // Note that the mapper is not differentiating between BLACK GREY AND WHITE 

     if (inNode.getColor() == Node.Color.GRAY) { 
      for (String neighbor : inNode.getEdges()) { 
       Node adjacentNode = new Node(); 

       // Remember that the current node only has the value the id 
       // of its neighbour, and not the object itself. Therefore at 
       // this stage there is no way of knowing and assigning any of 
       // its other properties. Also remember that the reducer is doing 
       // the 'clean up' task and not the mapper. 
       adjacentNode.setId(neighbor); 
       adjacentNode.setDistance(inNode.getDistance() + 1); 
       adjacentNode.setColor(Node.Color.GRAY); 
       adjacentNode.setParent(inNode.getId()); 
       context.write(new Text(adjacentNode.getId()), adjacentNode.getNodeInfo()); // Get nodeinfo returns a Text Object 
      } 
      inNode.setColor(Node.Color.BLACK); 
     } 
     // Emit the input node, other wise the BLACK color change(if it happens) 
     // Wont be persistent 
     context.write(new Text(inNode.getId()), inNode.getNodeInfo()); 

    } 
} 

SearchReducer.java

import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.io.Text; 
import java.io.IOException; 

public class SearchReducer extends Reducer<Text, Text, Text, Text> { 

    // Types of the input key, the values associated with the key, the Context object for Reducers communication 
    // with the Hadoop framework and the node whose information has to be output 
    // the return type is a Node 
    public Node reduce(Text key, Iterable<Text> values, Context context, Node outNode) 
      throws IOException, InterruptedException { 

     // set the node id as the key 
     outNode.setId(key.toString()); 

     // TODO : (huh?) Since the values are of the type Iterable, iterate through the values associated with the key 
     // for all the values corresponding to a particular node id 

     for (Text value : values) { 

      Node inNode = new Node(key.toString() + "\t" + value.toString()); 

      // Emit one node after combining all the mapper outputs 

      // Only one node(the original) will have a non-null adjascency list 
      if (inNode.getEdges().size() > 0) { 
       outNode.setEdges(inNode.getEdges()); 
      } 

      // Save the minimum distance and parent 
      if (inNode.getDistance() < outNode.getDistance()) { 
       outNode.setDistance(inNode.getDistance()); 
       outNode.setParent(inNode.getParent()); 
      } 

      // Save the darkest color 
      if (inNode.getColor().ordinal() > outNode.getColor().ordinal()) { 
       outNode.setColor(inNode.getColor()); 
      }   
     } 
     context.write(key, new Text(outNode.getNodeInfo()));  
     return outNode; 
    } 
} 

BaseJob.java (универсальный класс веб-сайт упомянутый ниже, который в основном определяет работу вверх)

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.io.Text; 
import java.io.IOException; 

public abstract class BaseJob extends Configured implements Tool { 

     protected Job setupJob(String jobName,JobInfo jobInfo) throws Exception { 

     Job job = new Job(new Configuration(), jobName); 
     job.setJarByClass(jobInfo.getJarByClass()); 

     job.setMapperClass(jobInfo.getMapperClass()); 
     if (jobInfo.getCombinerClass() != null) 
      job.setCombinerClass(jobInfo.getCombinerClass()); 
     job.setReducerClass(jobInfo.getReducerClass()); 

     // TODO : set number of reducers as required 
     job.setNumReduceTasks(3); 

     job.setOutputKeyClass(jobInfo.getOutputKeyClass()); 
     job.setOutputValueClass(jobInfo.getOutputValueClass()); 
     /* 
     job.setJarByClass(SSSPJob.class); 
     job.setMapperClass(SearchMapper.class); 
     job.setReducerClass(SearchReducer.class); 
     job.setNumReduceTasks(3); 
     job.setOutputValueClass(Text.class); 
     job.setOutputKeyClass(Text.class);*/ 
     return job; 
    } 

    // Implement an abstract class for JobInfo object 
    protected abstract class JobInfo { 
     public abstract Class<?> getJarByClass(); 
     public abstract Class<? extends Mapper> getMapperClass(); 
     public abstract Class<? extends Reducer> getCombinerClass(); 
     public abstract Class<? extends Reducer> getReducerClass(); 
     public abstract Class<?> getOutputKeyClass(); 
     public abstract Class<?> getOutputValueClass(); 

    } 
} 

SSSPJob.java (водитель)

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Counters; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.fs.Path; 

public class SSSPJob extends BaseJob { 
    // method to set the configuration for the job and the mapper and the reducer classes 
    private Job getJobConf(String[] args) 
     throws Exception { 

    // Defining the abstract class objects 
     JobInfo jobInfo = new JobInfo() { 
      @Override 
      public Class<? extends Reducer> getCombinerClass() { 
       return null; 
      } 

      @Override 
      public Class<?> getJarByClass() { 
       return SSSPJob.class; 
      } 

      @Override 
      public Class<? extends Mapper> getMapperClass() { 
       return SearchMapper.class; 
      } 

      @Override 
      public Class<?> getOutputKeyClass() { 
       return Text.class; 
      } 

      @Override 
      public Class<?> getOutputValueClass() { 
       return Text.class; 
      } 

      @Override 
      public Class<? extends Reducer> getReducerClass() { 
       return SearchReducer.class; 
      } 
     }; 

     return setupJob("ssspjob", jobInfo); 

    } 

    // the driver to execute the job and invoke the map/reduce functions 

    public int run(String[] args) throws Exception { 
     int iterationCount = 0; 
     Job job; 
     // No of grey nodes 
     long terminationValue = 1; 

     while(terminationValue >0){ 
      job = getJobConf(args); 
      String input, output; 

      // Setting the input file and output file for each iteration 
      // During the first time the user-specified file will be the 
      // input whereas for the subsequent iterations 
      // the output of the previous iteration will be the input 
      // NOTE: Please be clear of how the input output files are set 
      //  before proceding. 

      // for the first iteration the input will be the first input argument 
      if (iterationCount == 0) 
       input = args[0]; 
      else 
       // for the remaining iterations, the input will be the output of the previous iteration 
       input = args[1] + iterationCount; 

      output = args[1] + (iterationCount + 1); 

      FileInputFormat.setInputPaths(job, new Path(input)); 
      FileOutputFormat.setOutputPath(job, new Path(output)); 

      job.waitForCompletion(true); 

      Counters jobCntrs = job.getCounters(); 
      terminationValue = jobCntrs.findCounter(MoreIterations.numberOfIterations).getValue(); 
      // if the counter's value is incremented in the reducer(s), then there are more 
      // GRAY nodes to process implying that the iteration has to be continued. 
      iterationCount++; 
     } 
     return 0; 
    } 

    public static void main(String[] args) throws Exception { 

     int res = ToolRunner.run(new Configuration(), new SSSPJob(), args); 
     if(args.length != 2){ 
      System.err.println("Usage: <in> <output name> "); 
      System.exit(1); 
      System.out.println("Huh?"); 
     } 
     System.exit(res); 
    } 

} 

И кроме того, я не уверен, как отладка выполняется на Hadoop. Все мои отладочные заявления печати, похоже, не имеют никакого эффекта, и я подозреваю, что структура hadoop записывает сообщения журнала в другое место или файл. Спасибо :)

ответ

0

Ключ в задании MR должен реализовывать WritableComparable, а значение должно реализовывать Writable. Я думаю, что ваш код mapper использует экземпляр типа «Объект». Просто добавьте аннотацию @Override перед вашей картой и уменьшите методы, чтобы они отображали ошибки. В противном случае вы не видите никаких ошибок, но поскольку подпись не соответствует, вызывается по умолчанию IdentityMapper и, следовательно, ошибка. Если вы обрабатываете текстовый файл, ключ для метода карты должен быть LongWritable, если вы хотите использовать пользовательский ключ, он должен реализовать WritableComparable.

+0

Спасибо, я вижу ошибку при добавлении @ Переопределить. Можете ли вы указать мне ресурс, объясняющий ПОЧЕМУ и как это требуется. Могу ли я изменить сигнатуру метода карты на дополнительный параметр, как показано выше, если я реализую совместимый с возможностью записи интерфейс? – metastableB