2013-09-22 2 views
0

Я только начал изучать Hadoop, и я извлек пример из книги. Поэтому я создал MapReducer для локального запуска, который извлекает температуры из файлов данных NCDC. Это образец данных:Hadoop: странный NullPointer Исключение работает в локальном режиме

0143023780999992012010100004+61450+017167FM-12+002799999V0209999C...cut...; 

Каждый файл (я скачал около 100 файлов) состоит из многих линий, как это.

My mapper выполняет простые операции синтаксического анализа, чтобы извлечь температуру из этих файлов. Весь процесс вернет максимальную температуру.

Mapper и относительные тесты:

public class MaxTemperatureMapper extends Mapper<LongWritable,Text,Text,IntWritable> { 

@Override 
public void map(LongWritable key, Text value, Context context) { 
    String record = value.toString(); 
    String year = record.substring(15,19); 
    int airTemperature = extractTemp(record); 
    if (isNotValidTemp(record, airTemperature)) return; 
    try { 
     context.write(new Text(year), new IntWritable(airTemperature)); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
} 

private boolean isNotValidTemp(String record, int airTemperature) { 
    return airTemperature == 9999 || !record.substring(92, 93).matches("[01459]"); 
} 

private int extractTemp(String record) { 
    String temp = (record.charAt(87) == '+') 
      ? record.substring(88,92) 
      : record.substring(87,92); 
    return Integer.parseInt(temp); 
} 

} 

public class MaxTemperatureMapperTest { 

@Test 
public void processRecord() { 
    Text value = new Text("0111011120999992012010100004+65450+012217FM-12+000999999V0201301N014019999999N9999999N1+00031-00791099271ADDMA1999999099171MD1810341+9999REMSYN070AAXX 01001 01112 46/// /1314 10003 21079 39917 49927 58034 333 91124;"); 

    new MapDriver<LongWritable, Text, Text, IntWritable>() 
      .withMapper(new MaxTemperatureMapper()) 
      .withInputValue(value) 
      .withOutput(new Text("2012"), new IntWritable(3)) 
      .runTest(); 
} 

@Test 
public void processRecordsFromSuspiciousFile() throws IOException { 
    final InputStream is = getClass().getClassLoader().getSystemResource("023780-99999-2012").openStream(); 
    BufferedReader br = new BufferedReader(new InputStreamReader(is)); 
    String line; 
    Iterator<Integer> ii = Arrays.asList(-114, -120, -65, -45, 1, 4, 6, 6, 10, 16, 18, 29, 32, 17, 7, 16, 22, 8, 8, 20).iterator(); 
    while ((line = br.readLine()) != null) { 
     new MapDriver<LongWritable, Text, Text, IntWritable>() 
       .withMapper(new MaxTemperatureMapper()) 
       .withInputValue(new Text(line)) 
       .withOutput(new Text("2012"), new IntWritable(ii.next())) 
       .runTest(); 
    } 
    br.close(); 


} 
} 

Редуктор и относительные тесты:

public class MaxTemperatureReducer extends Reducer<Text,IntWritable,Text,IntWritable> { 

@Override 
public void reduce(Text key, Iterable<IntWritable> values, Context context) { 
    int maxValue = Integer.MIN_VALUE; 
    for (IntWritable value : values) { 
     maxValue = Math.max(value.get(), maxValue); 
    } 
    try { 
     context.write(key, new IntWritable(maxValue)); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
} 

} 

public class MaxTemperatureReducerTest { 

@Test 
public void processRecord() { 

    new ReduceDriver<Text,IntWritable,Text,IntWritable>() 
      .withReducer(new MaxTemperatureReducer()) 
      .withInputKey(new Text("2012")) 
      .withInputValues(Arrays.asList(new IntWritable(5), new IntWritable(10))) 
      .withOutput(new Text("2012"), new IntWritable(10)) 
      .runTest(); 
} 
} 

Наконец класс Driver + тест:

public class MaxTemperatureDriver extends Configured implements Tool { 

@Override 
public int run(String[] args) throws Exception { 
    if (args.length != 2) { 
     System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName()); 
     ToolRunner.printGenericCommandUsage(System.err); 
     return -1; 
    } 

    Job job = new Job(getConf(), "Max Temperature"); 
    job.setJarByClass(getClass()); 

    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 

    job.setMapperClass(MaxTemperatureMapper.class); 
    job.setCombinerClass(MaxTemperatureReducer.class); 
    job.setReducerClass(MaxTemperatureReducer.class); 

    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Iterable.class); 

    return job.waitForCompletion(true) ? 0 : 1; 
} 

public static void main(String[] args) throws Exception { 
    int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args); 
    System.exit(exitCode); 
} 
} 

public class MaxTemperatureDriverTest { 

@Test 
public void test() throws Exception { 
    Configuration conf = new Configuration(); 
    conf.set("fs.default.name", "file:///"); 
    conf.set("mapred.job.tracker", "local"); 

    Path input = new Path("file:////home/user/big-data/ncdc/"); 
    Path output = new Path("output"); 

    FileSystem fs = FileSystem.getLocal(conf); 
    fs.delete(output, true); 

    MaxTemperatureDriver driver = new MaxTemperatureDriver(); 
    driver.setConf(conf); 

    int exitCode = driver.run(new String[] { input.toString(), output.toString() }); 
    assertThat(exitCode, is(0)); 
} 
} 

бегаю весь процесс с помощью командной строки :

$> hadoop doop.MaxTemperatureDriver -fs file:/// -jt local ~/big-data/ncdc/ output 

и тест в MaxTemperatureDriverTest, но в обоих случаях я получил:

13/09/21 19:45:13 INFO mapred.MapTask: Processing split: file:/home/user/big-data/ncdc/023780-99999-2012:0+5337 
13/09/21 19:45:13 INFO mapred.MapTask: io.sort.mb = 100 
13/09/21 19:45:14 INFO mapred.MapTask: data buffer = 79691776/99614720 
13/09/21 19:45:14 INFO mapred.MapTask: record buffer = 262144/327680 
13/09/21 19:45:14 INFO mapred.LocalJobRunner: Map task executor complete. 
13/09/21 19:45:14 WARN mapred.LocalJobRunner: job_local462595973_0001 
java.lang.Exception: java.lang.NullPointerException 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354) 
Caused by: java.lang.NullPointerException 
    at org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73) 
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:970) 
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:673) 
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:756) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) 
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:138) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 
    at java.lang.Thread.run(Thread.java:662) 

В «слишком общий», как он всегда возвращает исключение нуль-указатель, когда пытается разобрать файл «023780-99999- 2012" . Поэтому я написал для него тест (который вы можете увидеть в тестах mapper «processRecordsFromSuspiciousFile»), но он не возвращает ошибок. Я также проверил журналы без каких-либо успехов.

Это связано с неправильными или отсутствующими параметрами локального режима (числовые потоки, куча памяти и т. Д.)? или что-то не так в коде?

ответ

1

Hadoop не имеет представления из коробки, как сериализовать Iterable. Если вы действительно собираетесь использовать Iterable в качестве вашего класса выходных значений, вам также потребуется указать сериализатор для Iterable. Типичными типами ввода/вывода, используемыми с Hadoop, являются подклассы Writable.

Обновление: теперь я вижу, что вы используете IntWritable как свой класс выходных значений. Ваша проблема заключается этот драйвер линии:

job.setOutputValueClass(Iterable.class) 

должен быть

job.setOutputValueClass(IntWritable.class)  
+0

Спасибо! Да, я не видел его и не мог обнаружить в журналах и тестах :) – Randomize

Смежные вопросы