2015-09-09 2 views
0

У меня есть данные об авариях из анализа данных трафика. Некоторые из столбцов:Hadoop Вторичный сорт - использовать или не использовать

Accident Id, от несчастных случаев Дата, день недели

1, 1/1/1979, 5 (четверг)

2, 1/2/1979, 6 (пятница)

.......

3, 1/1/1980, 0 (воскресенье)

Я пытаюсь решить следующие:

Находит количество несчастных случаев в год в день

так результат должен выглядеть следующим образом:

где Ключ (год, день недели)

и стоимость = количество несчастных случаев в тот же день Здесь линии 1 представляет, год = 1979 день = воскресенье и количество несчастных случаев = 500 и т. Д.

1979,1  500 

1979,2 1500 

1979,3 2500 

1979,4 3500 

1979,5 4500 

1979,6 5500 

1979,7 6500 

1980,1  500 

1980,2 1500 

1980,3 2500 

1980,4 3500 

1980,5 4500 

В этом случае я пытаюсь решить его, используя метод вторичной сортировки. Это правильный способ решить эту проблему?

Если вторичная сортировка правильная, ее не работает для меня. Вот ключевой класс, картограф и редуктор. Но мой результат не соответствует ожиданиям. Пожалуйста, помогите ..

public class DOW implements WritableComparable<DOW> { 
    private Text year; 
    private Text day; 

    // private final Text count; 

    // private int count; 
    public DOW() { 
     this.year = new Text(); 
     this.day = new Text(); 
     // this.count = count; 
    } 

    public DOW(Text year, Text day) { 
     this.year = year; 
     this.day = day; 
     // this.count = count; 
    } 

    public Text getYear() { 
     return this.year; 
    } 

    public void setYear(Text year) { 
     this.year = year; 
    } 

    public Text getDay() { 
     return this.day; 
    } 

    public void setDay(Text day) { 
     this.day = day; 
    } 

    @Override 
    public void readFields(DataInput in) throws IOException { 
     // TODO Auto-generated method stub 
     year.readFields(in); 
     day.readFields(in); 

    } 

    @Override 
    public void write(DataOutput out) throws IOException { 
     // TODO Auto-generated method stub 
     year.write(out); 
     day.write(out); 
    } 

    @Override 
    public int compareTo(DOW o) { 
     // TODO Auto-generated method stub 
     int cmp = year.compareTo(o.year); 
     if (cmp != 0) { 
      return cmp; 
     } 
     return o.day.compareTo(this.day); 
    } 

    @Override 
    public String toString() { 
     // TODO Auto-generated method stub 
     return year + "," + day; 
    } 

    @Override 
    public boolean equals(Object o) { 
     // TODO Auto-generated method stub 
     if (o instanceof DOW) { 
      DOW tp = (DOW) o; 
      return year.equals(tp.year) && day.equals(tp.day); 
     } 
     return false; 
    } 

    @Override 
    public int hashCode() { 
     // TODO Auto-generated method stub 
     return year.hashCode() * 163 + day.hashCode(); 
    } 
} 
public class AccidentDowDemo extends Configured implements Tool { 

    public static class DOWMapper extends Mapper<LongWritable, Text, DOW, IntWritable> { 
     private static final Logger sLogger = Logger.getLogger(DOWMapper.class); 

     @Override 
     protected void map(LongWritable key, Text value, Context context) 
       throws java.io.IOException, InterruptedException { 

      if (value.toString().contains(",")) { 
       String[] array = value.toString().split(","); 
       if (!array[9].equals("Date")) { 
        Date dt = null; 
        try { 
         dt = new SimpleDateFormat("dd/mm/yyyy").parse(array[9]); 

        } catch (ParseException e) { 
         // TODO Auto-generated catch block 

         e.printStackTrace(); 
        } 

        int year = dt.getYear(); 

        int day = Integer.parseInt(array[10].toString()); 
             context.write(new DOW(new Text(Integer.toString(year)), 
          new Text(Integer.toString(day))), 
          new IntWritable(1)); 
       } 
      } 
     }; 
    } 

    public static class DOWReducer extends Reducer<DOW, IntWritable, DOW, IntWritable> { 
     private static final Logger sLogger = Logger 
       .getLogger(DOWReducer.class); 

     @Override 
     protected void reduce(DOW key, Iterable<IntWritable> values, 
       Context context) throws java.io.IOException, 
       InterruptedException { 
      int count = 0; 
      sLogger.info("key =" + key); 
      for (IntWritable x : values) { 
       int val = Integer.parseInt(x.toString()); 
       count = count + val; 
      } 
      context.write(key, new IntWritable(count)); 
     }; 
    } 

    public static class FirstPartitioner extends Partitioner<DOW, IntWritable> { 

     @Override 
     public int getPartition(DOW key, IntWritable value, int numPartitions) { 
      // TODO Auto-generated method stub 

      return Math.abs(Integer.parseInt(key.getYear().toString()) * 127) 
        % numPartitions; 
     } 
    } 

    public static class KeyComparator extends WritableComparator { 
     protected KeyComparator() { 
      super(DOW.class, true); 
     } 

     @Override 
     public int compare(WritableComparable w1, WritableComparable w2) { 
      // TODO Auto-generated method stub 

      DOW ip1 = (DOW) w1; 
      DOW ip2 = (DOW) w2; 
      int cmp = ip1.getYear().compareTo(ip2.getYear()); 
      if (cmp == 0) { 
       cmp = -1 * ip1.getDay().compareTo(ip2.getDay()); 
      } 
      return cmp; 
     } 
    } 

    public static class GroupComparator extends WritableComparator { 
     protected GroupComparator() { 
      super(DOW.class, true); 
     } 

     @Override 
     public int compare(WritableComparable w1, WritableComparable w2) { 

      // TODO Auto-generated method stub 
      DOW ip1 = (DOW) w1; 
      DOW ip2 = (DOW) w2; 
      return ip1.getYear().compareTo(ip2.getYear()); 
     } 
    } 
} 
+0

Просто для уточнения расчета количества дорожно-транспортных происшествий на ежегодной основе вы можете передать карту как и подвести итог в сторону редуктора и для расчета на ежедневной основе вы можете использовать mapper как <целая дата в MM/DD/YYYY, новый DoubleWritable (1)> и суммируйте значение со стороны редуктора. Вы подумали об этом и почему-то проигнорировали? и почему вы хотите взять недельные дни вместе с вашим ключом? –

+0

Привет, На самом деле я хочу, чтобы результат выглядел так: 1979, 1 500. Эта строка говорит мне, что в 1979 году по воскресеньям зарегистрировано 500 несчастных случаев. Так что за каждый год у меня будет 7 рядов, то есть 1 строка в день, и каждый ряд расскажет, сколько несчастных случаев в день недели. Надеюсь, что это очистит ваш вопрос. – DevHelp

ответ

0

Если вам нужно в основном имитировать

select year, day, count(*) as totalPerDay from DATA group by year, day 

чем вам не нужен вторичный вид.

Но если вам нужно произвести что-то вроде CUBE, где вам нужно рассчитать общее количество в год и общее количество в неделю в одной задаче MR, чем вторичная сортировка - это путь.

+0

Спасибо Алекс .. Ваш комментарий помог в решении проблемы. Я решил это двумя способами: 1) Без вторичной сортировки - я просто удалил разделитель, GroupComparater и sortComparator 2) Чтобы решить проблему со вторичной сортировкой, я сделал это изменение и хочу получить обратную связь, если я могу использовать оба ключа в GroupComparator: – DevHelp

+0

protected GroupComparator() { super (DOW.class, true); } @Override общественных INT сравнить (WritableComparable w 1, w 2 WritableComparable) {// TODO автоматическая генерация метод заглушки ДОУ IP1 = (ДОУ) w1; DOW ip2 = (DOW) w2; int cmp = ip1.getYear(). CompareTo (ip2.getYear()); if (cmp! = 0) { return cmp; } return ip1.getDay(). CompareTo (ip2.getDay()); }} – DevHelp

+0

Я только что отправил :(Да и есть, чтобы сделать это, чтобы получить результат –

0

Это более или менее вторичная сортировка, но это не так. Проблема с GroupComparator, сравнение должно проводиться как по годам, так и по дням. Идея groupcomparator заключается в том, чтобы убедиться, что тот же год входит в один и тот же редуктор, но здесь нам это не нужно, вместо этого данные должны входить в один и тот же редуктор, если он имеет тот же год и тот же день (в 1979 году и в воскресенье). Это должно выглядеть примерно так.

package accidentexercise; 

import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.io.WritableComparator; 

public class ClassGroupComparator extends WritableComparator 
{ 
    protected ClassGroupComparator() 
    { 
     super(TextpairWritable.class,true); 
    } 
    @SuppressWarnings("rawtypes") 
    public int compare(WritableComparable w,WritableComparable w1) 
    { 
     TextpairWritable s=(TextpairWritable)w; 
     TextpairWritable s1=(TextpairWritable)w1; 
     int cmp= s.year.compareTo(s1.year); 

      if(cmp==0) 
      { 
       cmp= -1*s.day.compareTo(s1.day); 
      } 
      return cmp; 
    } 
} 

Я вставляю весь свой код.

TextpairWritable: 

package accidentexercise; 

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 

import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.WritableComparable; 

public class TextpairWritable implements WritableComparable<TextpairWritable> 
{ 
    Text year=new Text(); 
    Text day=new Text(); 
    public TextpairWritable() 
    { 
     this.year=new Text(); 
     this.day=new Text(); 
    } 
    public TextpairWritable(Text year,Text day) 
    { 
     this.year=year; 
     this.day=day; 
    } 

    public TextpairWritable(String year,String day) 
    { 
     this.year=new Text(year); 
     this.day=new Text(day); 
    } 
    public TextpairWritable(TextpairWritable o) 
    { 
     this.year=o.year; 
     this.day=o.day; 
    } 
    public void set(Text year,Text day) 
    { 
     this.year=year; 
     this.day=day; 
    } 
    public Text getyear() 
    { 
     return this.year; 
    } 
    public Text getday() 
    { 
     return this.day; 
    } 
    @Override 
    public void readFields(DataInput in) throws IOException { 
     year.readFields(in); 
     day.readFields(in); 
    } 

    @Override 
    public void write(DataOutput out) throws IOException { 
     year.write(out); 
     day.write(out); 
    } 

    public String toString() 
    { 
     return year+" "+day; 
    } 

    public int compareTo(TextpairWritable o) 
    { 
     int cmp=year.compareTo(day); 
     if(cmp==0) 
     { 
      cmp=day.compareTo(day); 
     } 
     return cmp; 
    } 
} 

GroupComparator: 


package accidentexercise; 

import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.io.WritableComparator; 

public class ClassGroupComparator extends WritableComparator 
{ 
    protected ClassGroupComparator() 
    { 
     super(TextpairWritable.class,true); 
    } 
    @SuppressWarnings("rawtypes") 
    public int compare(WritableComparable w,WritableComparable w1) 
    { 
     TextpairWritable s=(TextpairWritable)w; 
     TextpairWritable s1=(TextpairWritable)w1; 
     int cmp= s.year.compareTo(s1.year); 

      if(cmp==0) 
      { 
       cmp= -1*s.day.compareTo(s1.day); 
      } 
      return cmp; 
    } 
} 

SortComparator: 
package accidentexercise; 

import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.io.WritableComparator; 

public class ClassSortComparator extends WritableComparator 
{ 
    protected ClassSortComparator() 
    { 
     super(TextpairWritable.class,true); 
    } 
    @SuppressWarnings("rawtypes") 
    public int compare(WritableComparable w,WritableComparable w1) 
    { 
     TextpairWritable s=(TextpairWritable)w; 
     TextpairWritable s1=(TextpairWritable)w1; 
     int cmp=s.year.compareTo(s1.year); 
     if(cmp==0) 
     { 
      cmp= -1*s.day.compareTo(s1.day); 
     } 
     return cmp; 
    } 

} 
Mapper: 
package accidentexercise; 

import java.io.IOException; 
import java.text.ParseException; 
import java.text.SimpleDateFormat; 
import java.util.Date; 

import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

public class ClassMapper extends Mapper<LongWritable,Text,TextpairWritable,IntWritable> 
{ 
    public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException 
    { 

     Logger log=LoggerFactory.getLogger(ClassMapper.class) ; 
     String s=value.toString(); 
     String[] orig_data=s.split(","); 

     SimpleDateFormat df=new SimpleDateFormat("dd/MM/yyyy"); 
     df.setLenient(false); 
     try 
     { 
      @SuppressWarnings("unused") 
      Date date=df.parse(orig_data[0]); 
      String myyear=orig_data[0].substring(6, 10); 
      context.write(new TextpairWritable(new Text(myyear),new Text(orig_data[2])),new IntWritable(Integer.parseInt(orig_data[1]))); 
     } 
     catch(ParseException e) 
     { 

      log.info("Date is not correct"+e); 
     } 
    } 
} 
Reducer: 
package accidentexercise; 

import java.io.IOException; 

import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.mapreduce.Reducer; 

public class ClassReducer extends Reducer<TextpairWritable,IntWritable,TextpairWritable,IntWritable> 
{ 
    public void reduce(TextpairWritable key,Iterable<IntWritable> value,Context context) throws IOException,InterruptedException 
    { 
     int count=0; 
     for(IntWritable it:value) 
     { 
      count+=it.get(); 
     } 
     context.write(key,new IntWritable(count)); 
    } 

} 
Driver: 
package accidentexercise; 


import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 


public class ClassDriver { 
    public static void main(String args[]) throws Exception 
    { 
     if(args.length!=2) 
      { 
      System.err.println("Usage: Worddrivernewapi <input path> <output path>"); 
      System.exit(-1); 
      } 
     Job job=new Job(); 

     job.setJarByClass(ClassDriver.class); 
     job.setJobName("MyDriver"); 

     FileInputFormat.addInputPath(job,new Path(args[0])); 
     FileOutputFormat.setOutputPath(job,new Path(args[1])); 

     job.setMapperClass(ClassMapper.class); 
     job.setPartitionerClass(ClassPartitioner.class); 
     job.setSortComparatorClass(ClassSortComparator.class); 
     job.setGroupingComparatorClass(ClassGroupComparator.class); 
     job.setReducerClass(ClassReducer.class); 
     //job.setNumReduceTasks(0); 

     job.setOutputKeyClass(TextpairWritable.class); 
     job.setOutputValueClass(IntWritable.class); 

     System.exit(job.waitForCompletion(true) ? 0 : 1); 

    } 

} 
Partitioner: 
package accidentexercise; 

import org.apache.hadoop.io.IntWritable; 

import org.apache.hadoop.mapreduce.Partitioner; 

public class ClassPartitioner extends Partitioner<TextpairWritable,IntWritable> 
{ 

    @Override 
    public int getPartition(TextpairWritable tp, IntWritable value, int numPartitions) { 

     return Math.abs(Integer.parseInt(tp.getyear().toString()) * 127) % numPartitions; 
    } 


} 

Пример ввода:

Дата, Number_of_accidents, день

01/03/2014,18,2

02/03/2014,19,3

03/03/2014,20,4

01/03/2014,1,2

02/03/2014,2,3

03/03/2014,4,4

01/03/2014,8,2

02/03/2014,9 , 3

03/03/2014,2,4

Выход:

01/03/2014,2,27

02/03/2014,3,30

03/03/2014,4,26

+1

создания DF (DF = SimpleDateFormat новый SimpleDateFormat («дд/мм/гггг») ;) внутри метода карты довольно дорого вычислительно, так как он должен скомпилировать идентификатор для каждой строки ввода, сделать его переменной-членом Mapper, это может сделать работу MR Job более быстрой. – alexeipab

+1

Writable являются изменяемыми объектами, и Hadoop всегда использует один и тот же экземпляр, например, вы можете проверить в отладчике, что он является тем же самым записываемым объектом, который передается в качестве аргументов для сопоставления и уменьшения методов, поэтому для java GC будет проще, если все доступные для записи, переданные в context.write, создаются полями членов Редуктора и Mapper. – alexeipab

+0

@alexeipab Спасибо за ваши комментарии. Получил первый. Должен был объявить его перед вызовом карты. Кстати, в спешке. Для второго комментария вы хотите сказать, что нужно создать отдельную переменную-член для нового Text (myyear), нового Text (orig_data [2])), нового IntWritable (Integer.parseInt (orig_data [1])), и эта переменная-член имеет для использования в контексте .write. –

Смежные вопросы