Я запускаю две пары с уменьшением пары. Результат первого уменьшения карты используется как вход для следующего уменьшения карты. Для этого я дал job.setOutputFormatClass (SequenceFileOutputFormat.class). Во время работы следующего класса Driver:java.lang.NullPointerException at org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader.close
package org;
import org.apache.commons.configuration.ConfigurationFactory;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.VectorWritable;
public class Driver1 extends Configured implements Tool
{
\t
\t \t public int run(String[] args) throws Exception
\t \t {
\t \t
\t \t if(args.length !=3) {
\t \t System.err.println("Usage: MaxTemperatureDriver <input path> <outputpath>");
\t \t System.exit(-1);
\t \t }
\t \t //ConfFactory WorkFlow=new ConfFactory(new Path("/input.txt"),new Path("/output.txt"),TextInputFormat.class,VarLongWritable.class,Text.class,VarLongWritable.class,VectorWritable.class,SequenceFileOutputFormat.class);
\t \t Job job = new Job();
\t \t Job job1=new Job();
\t \t job.setJarByClass(Driver1.class);
\t \t job.setJobName("Max Temperature");
\t
\t \t FileInputFormat.addInputPath(job, new Path(args[0]));
\t \t FileOutputFormat.setOutputPath(job,new Path(args[1]));
\t \t
\t \t job.setMapperClass(UserVectorMapper.class);
\t \t job.setReducerClass(UserVectorReducer.class);
\t \t
\t \t job.setOutputKeyClass(VarLongWritable.class);
\t \t job.setOutputValueClass(VectorWritable.class);
\t \t job.setOutputFormatClass(SequenceFileOutputFormat.class);
\t \t
\t \t job1.setJarByClass(Driver1.class);
\t \t //job.setJobName("Max Temperature");
\t \t job1.setInputFormatClass(SequenceFileInputFormat.class);
\t
\t \t FileInputFormat.addInputPath(job1, new Path("output/part-r-00000"));
\t \t FileOutputFormat.setOutputPath(job1,new Path(args[2]));
\t \t
\t \t job1.setMapperClass(ItemToItemPrefMapper.class);
\t \t //job1.setReducerClass(UserVectorReducer.class);
\t \t
\t \t job1.setOutputKeyClass(VectorWritable.class);
\t \t job1.setOutputValueClass(VectorWritable.class);
\t \t job1.setOutputFormatClass(SequenceFileOutputFormat.class);
\t \t System.exit(job.waitForCompletion(true) && job1.waitForCompletion(true) ? 0:1);
\t \t boolean success = job.waitForCompletion(true);
\t \t return success ? 0 : 1;
\t \t
\t \t }
\t \t public static void main(String[] args) throws Exception {
\t \t Driver1 driver = new Driver1();
\t \t int exitCode = ToolRunner.run(driver, args);
\t \t System.exit(exitCode);
\t \t }
\t \t }
Я получаю следующий журнал выполнения.
15/02/24 20:00:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/02/24 20:00:49 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/02/24 20:00:49 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
15/02/24 20:00:49 INFO input.FileInputFormat: Total input paths to process : 1
15/02/24 20:00:49 WARN snappy.LoadSnappy: Snappy native library not loaded
15/02/24 20:00:49 INFO mapred.JobClient: Running job: job_local1723586736_0001
15/02/24 20:00:49 INFO mapred.LocalJobRunner: Waiting for map tasks
15/02/24 20:00:49 INFO mapred.LocalJobRunner: Starting task: attempt_local1723586736_0001_m_000000_0
15/02/24 20:00:49 INFO util.ProcessTree: setsid exited with exit code 0
15/02/24 20:00:49 INFO mapred.Task: Using ResourceCalculatorPlugin : [email protected]
15/02/24 20:00:49 INFO mapred.MapTask: Processing split: file:/home/smaiti/workspace/recommendationsy/data.txt:0+1979173
15/02/24 20:00:50 INFO mapred.MapTask: io.sort.mb = 100
15/02/24 20:00:50 INFO mapred.MapTask: data buffer = 79691776/99614720
15/02/24 20:00:50 INFO mapred.MapTask: record buffer = 262144/327680
15/02/24 20:00:50 INFO mapred.JobClient: map 0% reduce 0%
15/02/24 20:00:50 INFO mapred.MapTask: Starting flush of map output
15/02/24 20:00:51 INFO mapred.MapTask: Finished spill 0
15/02/24 20:00:51 INFO mapred.Task: Task:attempt_local1723586736_0001_m_000000_0 is done. And is in the process of commiting
15/02/24 20:00:51 INFO mapred.LocalJobRunner:
15/02/24 20:00:51 INFO mapred.Task: Task 'attempt_local1723586736_0001_m_000000_0' done.
15/02/24 20:00:51 INFO mapred.LocalJobRunner: Finishing task: attempt_local1723586736_0001_m_000000_0
15/02/24 20:00:51 INFO mapred.LocalJobRunner: Map task executor complete.
15/02/24 20:00:51 INFO mapred.Task: Using ResourceCalculatorPlugin : [email protected]
15/02/24 20:00:51 INFO mapred.LocalJobRunner:
15/02/24 20:00:51 INFO mapred.Merger: Merging 1 sorted segments
15/02/24 20:00:51 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 2074779 bytes
15/02/24 20:00:51 INFO mapred.LocalJobRunner:
15/02/24 20:00:51 INFO mapred.Task: Task:attempt_local1723586736_0001_r_000000_0 is done. And is in the process of commiting
15/02/24 20:00:51 INFO mapred.LocalJobRunner:
15/02/24 20:00:51 INFO mapred.Task: Task attempt_local1723586736_0001_r_000000_0 is allowed to commit now
15/02/24 20:00:51 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1723586736_0001_r_000000_0' to output
15/02/24 20:00:51 INFO mapred.LocalJobRunner: reduce > reduce
15/02/24 20:00:51 INFO mapred.Task: Task 'attempt_local1723586736_0001_r_000000_0' done.
15/02/24 20:00:51 INFO mapred.JobClient: map 100% reduce 100%
15/02/24 20:00:51 INFO mapred.JobClient: Job complete: job_local1723586736_0001
15/02/24 20:00:51 INFO mapred.JobClient: Counters: 20
15/02/24 20:00:51 INFO mapred.JobClient: File Output Format Counters
15/02/24 20:00:51 INFO mapred.JobClient: Bytes Written=1012481
15/02/24 20:00:51 INFO mapred.JobClient: File Input Format Counters
15/02/24 20:00:51 INFO mapred.JobClient: Bytes Read=1979173
15/02/24 20:00:51 INFO mapred.JobClient: FileSystemCounters
15/02/24 20:00:51 INFO mapred.JobClient: FILE_BYTES_READ=6033479
15/02/24 20:00:51 INFO mapred.JobClient: FILE_BYTES_WRITTEN=5264031
15/02/24 20:00:51 INFO mapred.JobClient: Map-Reduce Framework
15/02/24 20:00:51 INFO mapred.JobClient: Reduce input groups=943
15/02/24 20:00:51 INFO mapred.JobClient: Map output materialized bytes=2074783
15/02/24 20:00:51 INFO mapred.JobClient: Combine output records=0
15/02/24 20:00:51 INFO mapred.JobClient: Map input records=100000
15/02/24 20:00:51 INFO mapred.JobClient: Reduce shuffle bytes=0
15/02/24 20:00:51 INFO mapred.JobClient: Physical memory (bytes) snapshot=0
15/02/24 20:00:51 INFO mapred.JobClient: Reduce output records=943
15/02/24 20:00:51 INFO mapred.JobClient: Spilled Records=200000
15/02/24 20:00:51 INFO mapred.JobClient: Map output bytes=1874777
15/02/24 20:00:51 INFO mapred.JobClient: Total committed heap usage (bytes)=415760384
15/02/24 20:00:51 INFO mapred.JobClient: CPU time spent (ms)=0
15/02/24 20:00:51 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0
15/02/24 20:00:51 INFO mapred.JobClient: SPLIT_RAW_BYTES=118
15/02/24 20:00:51 INFO mapred.JobClient: Map output records=100000
15/02/24 20:00:51 INFO mapred.JobClient: Combine input records=0
15/02/24 20:00:51 INFO mapred.JobClient: Reduce input records=100000
15/02/24 20:00:51 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/02/24 20:00:51 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
15/02/24 20:00:51 INFO input.FileInputFormat: Total input paths to process : 1
15/02/24 20:00:51 INFO mapred.JobClient: Running job: job_local735350013_0002
15/02/24 20:00:51 INFO mapred.LocalJobRunner: Waiting for map tasks
15/02/24 20:00:51 INFO mapred.LocalJobRunner: Starting task: attempt_local735350013_0002_m_000000_0
15/02/24 20:00:51 INFO mapred.Task: Using ResourceCalculatorPlugin : [email protected]
15/02/24 20:00:51 INFO mapred.MapTask: Processing split: file:/home/smaiti/workspace/recommendationsy/output/part-r-00000:0+1004621
15/02/24 20:00:51 INFO mapred.MapTask: io.sort.mb = 100
15/02/24 20:00:51 INFO mapred.MapTask: data buffer = 79691776/99614720
15/02/24 20:00:51 INFO mapred.MapTask: record buffer = 262144/327680
15/02/24 20:00:51 INFO mapred.MapTask: Ignoring exception during close for [email protected]
java.lang.NullPointerException
\t at org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader.close(SequenceFileRecordReader.java:101)
\t at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.close(MapTask.java:496)
\t at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:1776)
\t at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:778)
\t at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
\t at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)
\t at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
\t at java.util.concurrent.FutureTask.run(FutureTask.java:262)
\t at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
\t at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
\t at java.lang.Thread.run(Thread.java:745)
15/02/24 20:00:51 INFO mapred.LocalJobRunner: Map task executor complete.
15/02/24 20:00:51 WARN mapred.LocalJobRunner: job_local735350013_0002
java.lang.Exception: java.lang.ClassCastException: class org.apache.mahout.math.VectorWritable
\t at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354)
Caused by: java.lang.ClassCastException: class org.apache.mahout.math.VectorWritable
\t at java.lang.Class.asSubclass(Class.java:3208)
\t at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:795)
\t at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:964)
\t at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:673)
\t at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:756)
\t at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
\t at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)
\t at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
\t at java.util.concurrent.FutureTask.run(FutureTask.java:262)
\t at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
\t at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
\t at java.lang.Thread.run(Thread.java:745)
15/02/24 20:00:52 INFO mapred.JobClient: map 0% reduce 0%
15/02/24 20:00:52 INFO mapred.JobClient: Job complete: job_local735350013_0002
15/02/24 20:00:52 INFO mapred.JobClient: Counters: 0
Первый исключением того, что я получаю это: java.lang.NullPointerException в org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader.close (SequenceFileRecordReader.java : 101)
Пожалуйста, помогите.
Возможно, это проблема с вашим входным файлом. Вы пытались запустить только первые строки вашего входного файла? –
Вы разрешили эту проблему? У меня такая же проблема. В вашем случае я вижу другой выход MR в качестве входного пути. Использует ли ваш код файлы _SUCCESS? – Dmitry