2014-02-09 2 views
0

У меня есть сценарий, где я хочу рассчитать снижение значения.Карта Уменьшить код для расчета скорости снижения

Мой входной файл является CSV в формате: Key, Value, Отметка

1,600,2014-01-20 10:20:00 
1,1200,2014-01-20 10:30:00 
... 
2,2400,2014-01-30 11:20:00 
2,3600,2014-01-30 11:30:00 
... 

Там может быть несколько ключей, и каждый ключ может иметь несколько значений и штамп времени записи.

Мне нужно рассчитать снижение значений для каждого ключа за период времени.

Decline = (V2-V1)/(t2-t1) 

Здесь время t в секундах.

Мой ожидается выход что-то вроде,

1,1 
... 
2,2 
... 

серии MR код, который я написал выглядит примерно так,

import java.io.IOException; 
import java.util.*; 
import java.text.SimpleDateFormat; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 

public class TestMR 
{  
    public static class Map extends Mapper<LongWritable,Text,Text,Text> 
    { 
     public void map(LongWritable key, Text line, Context context) throws IOException, InterruptedException 
     { 
      String [] split = line.toString().split(","); 

      long t1 = 0; 
      SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
      try 
      { 
       t1 = df.parse(split[2]).getTime()/1000; 
      } 
      catch (java.text.ParseException e) 
      { 
       System.out.println("Unable to parse date string: " + split[2]); 
      } 

      StringBuffer sb = new StringBuffer(split[1]+","+t1); 

      context.write(new Text(split[0]), new Text(sb.toString())); 
     }   
    } 


    public static class Reduce extends Reducer<Text,Text,Text,Text> 
    { 
     public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
     { 
      Iterator iter = values.iterator(); 
      while(iter.hasNext()) 
      { 
       String [] tmpBuf_1 = iter.next().toString().split(","); 
       if(tmpBuf_1.length != 2) 
        continue; 
       String v1 = tmpBuf_1[0]; 
       double t1 = Double.parseDouble(tmpBuf_1[1]); 

       if(!iter.hasNext()) 
        break; 

       String [] tmpBuf_2 = iter.next().toString().split(",");  
       if(tmpBuf_2.length != 2) 
        continue; 
       String v2 = tmpBuf_2[0]; 
       double t2 = Double.parseDouble(tmpBuf_2[1]); 

       double vDiff = Double.parseDouble(v2) - Double.parseDouble(v1);  
       double tDiff = t2 - t1; 

       if(tDiff == 0) 
        continue; 

       double declineV = vDiff/tDiff; 

       context.write(key, new Text(String.valueOf(declineV))); 
      } 
     } 
    } 

    public static int main(String[] args) throws Exception 
    { 
     // Get the default configuration object 
     Configuration conf = new Configuration(); 

     // Add resources 
     conf.addResource("hdfs-default.xml"); 
     conf.addResource("hdfs-site.xml"); 
     conf.addResource("mapred-default.xml"); 
     conf.addResource("mapred-site.xml"); 
     conf.set("mapred.job.tracker", "local"); 

     Job job = new Job(conf); 
     job.setJobName("TestMR"); 
     job.setJarByClass(TestMR.class); 

     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(Text.class); 

     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 

     job.setMapperClass(Map.class); 
     job.setCombinerClass(Reduce.class); 
     job.setReducerClass(Reduce.class); 

     job.setInputFormatClass(TextInputFormat.class); 
     job.setOutputFormatClass(TextOutputFormat.class); 

     TextInputFormat.setInputPaths(job, new Path(args[0])); 
     TextOutputFormat.setOutputPath(job, new Path(args[1])); 

     // Set the jar file to run 
     job.setJarByClass(Example.class); 

     // Submit the job 
     Date startTime = new Date(); 
     System.out.println("Job started: " + startTime);  
     int exitCode = job.waitForCompletion(true) ? 0 : 1; 

     if(exitCode == 0) 
     {    
      Date end_time = new Date(); 
      System.out.println("Job ended: " + end_time); 
      System.out.println("The job took " + (end_time.getTime() - startTime.getTime())/1000 + " seconds.");      
     } 
     else { 
      System.out.println("Job Failed!!!"); 
     } 

     return exitCode; 
    } 
} 

я не получаю выход, когда я запускаю работу MR! Внизу команда след:

Job started: Sat Feb 08 16:36:07 PST 2014 
14/02/08 16:36:07 WARN conf.Configuration: session.id is deprecated. Instead, use dfs.metrics.session-id 
14/02/08 16:36:07 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 
14/02/08 16:36:07 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 
14/02/08 16:36:07 INFO input.FileInputFormat: Total input paths to process : 1 
14/02/08 16:36:08 INFO mapred.JobClient: Running job: job_local2110196638_0001 
14/02/08 16:36:08 INFO mapred.LocalJobRunner: OutputCommitter set in config null 
14/02/08 16:36:08 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 
14/02/08 16:36:08 INFO mapred.LocalJobRunner: Waiting for map tasks 
14/02/08 16:36:08 INFO mapred.LocalJobRunner: Starting task: attempt_local2110196638_0001_m_000000_0 
14/02/08 16:36:08 WARN mapreduce.Counters: Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead 
14/02/08 16:36:08 INFO util.ProcessTree: setsid exited with exit code 0 
14/02/08 16:36:08 INFO mapred.Task: Using ResourceCalculatorPlugin : [email protected] 
14/02/08 16:36:08 INFO mapred.MapTask: Processing split: hdfs://localhost.localdomain:8020/user/cloudera/input.csv:0+33554432 
14/02/08 16:36:08 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
14/02/08 16:36:08 INFO mapred.MapTask: io.sort.mb = 50 
14/02/08 16:36:08 INFO mapred.MapTask: data buffer = 39845888/49807360 
14/02/08 16:36:08 INFO mapred.MapTask: record buffer = 131072/163840 
14/02/08 16:36:09 INFO mapred.JobClient: map 0% reduce 0% 
In MAP!! 
245,1334603716 
14/02/08 16:36:14 INFO mapred.LocalJobRunner: 
14/02/08 16:36:15 INFO mapred.JobClient: map 9% reduce 0% 
14/02/08 16:36:16 INFO mapred.MapTask: Spilling map output: record full = true 
14/02/08 16:36:16 INFO mapred.MapTask: bufstart = 0; bufend = 2620494; bufvoid = 49807360 
14/02/08 16:36:16 INFO mapred.MapTask: kvstart = 0; kvend = 131072; length = 163840 
14/02/08 16:36:16 INFO compress.CodecPool: Got brand-new compressor [.snappy] 
In REDUCE!! 
14/02/08 16:36:17 INFO mapred.LocalJobRunner: 
14/02/08 16:36:17 INFO mapred.LocalJobRunner: 
14/02/08 16:36:17 INFO mapred.MapTask: Starting flush of map output 
14/02/08 16:36:18 INFO mapred.JobClient: map 49% reduce 0% 
14/02/08 16:36:18 INFO mapred.MapTask: Finished spill 0 
14/02/08 16:36:18 INFO mapred.MapTask: Finished spill 1 
14/02/08 16:36:18 INFO mapred.Merger: Merging 2 sorted segments 
14/02/08 16:36:18 INFO compress.CodecPool: Got brand-new decompressor [.snappy] 
14/02/08 16:36:18 INFO compress.CodecPool: Got brand-new decompressor [.snappy] 
14/02/08 16:36:18 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 339763 bytes 
14/02/08 16:36:19 INFO mapred.Task: Task:attempt_local2110196638_0001_m_000000_0 is done. And is in the process of commiting 
14/02/08 16:36:19 INFO mapred.LocalJobRunner: 
14/02/08 16:36:19 INFO mapred.Task: Task 'attempt_local2110196638_0001_m_000000_0' done. 
14/02/08 16:36:19 INFO mapred.LocalJobRunner: Finishing task: attempt_local2110196638_0001_m_000000_0 
14/02/08 16:36:19 INFO mapred.LocalJobRunner: Starting task: attempt_local2110196638_0001_m_000001_0 
14/02/08 16:36:19 WARN mapreduce.Counters: Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead 
14/02/08 16:36:19 INFO mapred.Task: Using ResourceCalculatorPlugin : [email protected] 
14/02/08 16:36:19 INFO mapred.MapTask: Processing split: hdfs://localhost.localdomain:8020/user/cloudera/input.csv:33554432+13261402 
14/02/08 16:36:19 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
14/02/08 16:36:19 INFO mapred.MapTask: io.sort.mb = 50 
14/02/08 16:36:19 INFO mapred.MapTask: data buffer = 39845888/49807360 
14/02/08 16:36:19 INFO mapred.MapTask: record buffer = 131072/163840 
14/02/08 16:36:20 INFO mapred.JobClient: map 50% reduce 0% 
14/02/08 16:36:20 INFO mapred.LocalJobRunner: 
14/02/08 16:36:20 INFO mapred.MapTask: Starting flush of map output 
14/02/08 16:36:20 INFO mapred.MapTask: Finished spill 0 
14/02/08 16:36:20 INFO mapred.Task: Task:attempt_local2110196638_0001_m_000001_0 is done. And is in the process of commiting 
14/02/08 16:36:20 INFO mapred.LocalJobRunner: 
14/02/08 16:36:20 INFO mapred.Task: Task 'attempt_local2110196638_0001_m_000001_0' done. 
14/02/08 16:36:20 INFO mapred.LocalJobRunner: Finishing task: attempt_local2110196638_0001_m_000001_0 
14/02/08 16:36:20 INFO mapred.LocalJobRunner: Map task executor complete. 
14/02/08 16:36:20 WARN mapreduce.Counters: Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead 
14/02/08 16:36:20 INFO mapred.Task: Using ResourceCalculatorPlugin : [email protected] 
14/02/08 16:36:20 INFO mapred.LocalJobRunner: 
14/02/08 16:36:20 INFO mapred.Merger: Merging 2 sorted segments 
14/02/08 16:36:20 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 496064 bytes 
14/02/08 16:36:20 INFO mapred.LocalJobRunner: 
14/02/08 16:36:21 INFO mapred.Task: Task:attempt_local2110196638_0001_r_000000_0 is done. And is in the process of commiting 
14/02/08 16:36:21 INFO mapred.LocalJobRunner: 
14/02/08 16:36:21 INFO mapred.Task: Task attempt_local2110196638_0001_r_000000_0 is allowed to commit now 
14/02/08 16:36:21 INFO output.FileOutputCommitter: Saved output of task 'attempt_local2110196638_0001_r_000000_0' to /user/cloudera/output 
14/02/08 16:36:21 INFO mapred.LocalJobRunner: reduce > reduce 
14/02/08 16:36:21 INFO mapred.Task: Task 'attempt_local2110196638_0001_r_000000_0' done. 
14/02/08 16:36:21 INFO mapred.JobClient: map 100% reduce 100% 
14/02/08 16:36:21 INFO mapred.JobClient: Job complete: job_local2110196638_0001 
14/02/08 16:36:21 INFO mapred.JobClient: Counters: 25 
14/02/08 16:36:21 INFO mapred.JobClient: File System Counters 
14/02/08 16:36:21 INFO mapred.JobClient:  FILE: Number of bytes read=1541573 
14/02/08 16:36:21 INFO mapred.JobClient:  FILE: Number of bytes written=2668157 
14/02/08 16:36:21 INFO mapred.JobClient:  FILE: Number of read operations=0 
14/02/08 16:36:21 INFO mapred.JobClient:  FILE: Number of large read operations=0 
14/02/08 16:36:21 INFO mapred.JobClient:  FILE: Number of write operations=0 
14/02/08 16:36:21 INFO mapred.JobClient:  HDFS: Number of bytes read=127382708 
14/02/08 16:36:21 INFO mapred.JobClient:  HDFS: Number of bytes written=0 
14/02/08 16:36:21 INFO mapred.JobClient:  HDFS: Number of read operations=17 
14/02/08 16:36:21 INFO mapred.JobClient:  HDFS: Number of large read operations=0 
14/02/08 16:36:21 INFO mapred.JobClient:  HDFS: Number of write operations=4 
14/02/08 16:36:21 INFO mapred.JobClient: Map-Reduce Framework 
14/02/08 16:36:21 INFO mapred.JobClient:  Map input records=419661 
14/02/08 16:36:21 INFO mapred.JobClient:  Map output records=202114 
14/02/08 16:36:21 INFO mapred.JobClient:  Map output bytes=4041067 
14/02/08 16:36:21 INFO mapred.JobClient:  Input split bytes=292 
14/02/08 16:36:21 INFO mapred.JobClient:  Combine input records=202114 
14/02/08 16:36:21 INFO mapred.JobClient:  Combine output records=95846 
14/02/08 16:36:21 INFO mapred.JobClient:  Reduce input groups=43 
14/02/08 16:36:21 INFO mapred.JobClient:  Reduce shuffle bytes=0 
14/02/08 16:36:21 INFO mapred.JobClient:  Reduce input records=95846 
14/02/08 16:36:21 INFO mapred.JobClient:  Reduce output records=0 
14/02/08 16:36:21 INFO mapred.JobClient:  Spilled Records=259510 
14/02/08 16:36:21 INFO mapred.JobClient:  CPU time spent (ms)=0 
14/02/08 16:36:21 INFO mapred.JobClient:  Physical memory (bytes) snapshot=0 
14/02/08 16:36:21 INFO mapred.JobClient:  Virtual memory (bytes) snapshot=0 
14/02/08 16:36:21 INFO mapred.JobClient:  Total committed heap usage (bytes)=376516608 
Job ended: Sat Feb 08 16:36:21 PST 2014 
The job took 13 seconds. 

Единственное, что я мог видеть, что Уменьш происходит до того, как карта работа завершена.

Считаете ли вы, что это могло вызвать проблему?

Если ДА, есть ли способ сказать «Уменьшить», чтобы дождаться завершения карты в первую очередь?

Если НЕТ, что может пойти не так в вышеуказанном коде?

+0

Это нормально, когда редуктор «начинает» перед преобразователем. Это не вычисление, а копирование с завершенных карт. –

ответ

0

(EDIT: убрана неправильное объяснение ранее)

Вы подаете Reduce как объединителя и редуктора. Комбинированный бит работает, но выход возвращается обратно в тот же класс, где все не имеет 2 столбца, поэтому каждая строка пропускается. Вы не можете применять это как объединитель.

Этот код также зависит от просмотра событий в отсортированном порядке по времени, но ничего о том, как оно построено, похоже, гарантирует это.

(У вас есть несколько незначительных странных вещей здесь, как бессмысленная StringBuffer (который должен быть StringBuilder так или иначе), продолжая неправильно после исключения, не импортируя ParseException и разборе долго, как двойной)

Смежные вопросы