2013-08-22 8 views
1

Я использую Hadoop для анализа данных GSOD (ftp://ftp.ncdc.noaa.gov/pub/data/gsod/). Я выбрал 5 лет для выполнения моих экспериментов (2005 - 2009). Я настроил небольшой кластер и выполнил простую программу MapReduce, которая получает максимальную температуру, зарегистрированную на год.Hadoop трудный с составным ключом

Теперь мне нужно создать новую программу MR, которая учитывает для каждой станции все явления, происходящие все эти годы.

файлы, которые я должен проанализировать иметь такую ​​структуру:

STN--- ... FRSHTO 
722115  110001 
722115  011001 
722110  111000 
722110  001000 
722000  001000 

Столбец STN означает код станции и FRSHTT означает явление: F - Туман, R - Дождь или морось, S - снег или ледяные пеллеты, H - Hail, T - Thunder, O - торнадо или воронкообразное облако.

Значение 1 означает, что это явление произошло в этот день; 0, означает, что не было.

Мне нужно найти результаты, как следующее:

722115: F = 1, R = 2, S = 1, O = 2 
722110: F = 1, R = 1, S = 2 
722000: S = 1 

Я мог бы запустить программу MR, но результаты не так, давая мне эти результаты:

722115 F, 1 
722115 R, 1 
722115 R, 1 
722115 S, 1 
722115 O, 1 
722115 O, 1 
722110 F, 1 
722110 R, 1 
722110 S, 1 
722110 S, 1 
722000 S, 1 

я использовал эти коды:

Mapper.java

public class Mapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, StationPhenomenun, IntWritable> { 
@Override 
protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException { 
    String line = value.toString(); 
    // Every file starts with a field description line, so, I ignore this line 
    if (!line.startsWith("STN---")) { 
     // First field of the line means the station code where data was collected 
     String station = line.substring(0, 6); 
     String fog = (line.substring(132, 133)); 
     String rainOrDrizzle = (line.substring(133, 134)); 
     String snowOrIcePellets = (line.substring(134, 135)); 
     String hail = (line.substring(135, 136)); 
     String thunder = (line.substring(136, 137)); 
     String tornadoOrFunnelCloud = (line.substring(137, 138)); 

     if (fog.equals("1")) 
      context.write(new StationPhenomenun(station,"F"), new IntWritable(1)); 
     if (rainOrDrizzle.equals("1")) 
      context.write(new StationPhenomenun(station,"R"), new IntWritable(1)); 
     if (snowOrIcePellets.equals("1")) 
      context.write(new StationPhenomenun(station,"S"), new IntWritable(1)); 
     if (hail.equals("1")) 
      context.write(new StationPhenomenun(station,"H"), new IntWritable(1)); 
     if (thunder.equals("1")) 
      context.write(new StationPhenomenun(station,"T"), new IntWritable(1)); 
     if (tornadoOrFunnelCloud.equals("1")) 
      context.write(new StationPhenomenun(station,"O"), new IntWritable(1)); 
    } 
} 
} 

Reducer.java

public class Reducer extends org.apache.hadoop.mapreduce.Reducer<StationPhenomenun, IntWritable, StationPhenomenun, IntWritable> { 

protected void reduce(StationPhenomenun key, Iterable<IntWritable> values, org.apache.hadoop.mapreduce.Reducer.Context context) throws IOException, InterruptedException { 
int count = 0;   
    for (IntWritable value : values) { 
     count++; 
    } 

    String station = key.getStation().toString(); 
    String occurence = key.getPhenomenun().toString(); 

    StationPhenomenun textPair = new StationPhenomenun(station, occurence); 
    context.write(textPair, new IntWritable(count)); 
} 
} 

StationPhenomenum.java

public class StationPhenomenun implements WritableComparable<StationPhenomenun> { 
private String station; 
private String phenomenun; 
public StationPhenomenun(String station, String phenomenun) { 
    this.station = station; 
    this.phenomenun = phenomenun; 
} 
public StationPhenomenun() { 
} 
public String getStation() { 
    return station; 
} 
public String getPhenomenun() { 
    return phenomenun; 
} 
@Override 
public void readFields(DataInput in) throws IOException { 
    station = in.readUTF(); 
    phenomenun = in.readUTF(); 
} 
@Override 
public void write(DataOutput out) throws IOException { 
    out.writeUTF(station); 
    out.writeUTF(phenomenun); 
} 
@Override 
public int compareTo(StationPhenomenun t) { 
    int cmp = this.station.compareTo(t.station); 
    if (cmp != 0) { 
     return cmp; 
    } 
    return this.phenomenun.compareTo(t.phenomenun); 
}  
@Override 
public boolean equals(Object obj) { 
    if (obj == null) { 
     return false; 
    } 
    if (getClass() != obj.getClass()) { 
     return false; 
    } 
    final StationPhenomenun other = (StationPhenomenun) obj; 
    if (this.station != other.station && (this.station == null || !this.station.equals(other.station))) { 
     return false; 
    } 
    if (this.phenomenun != other.phenomenun && (this.phenomenun == null || !this.phenomenun.equals(other.phenomenun))) { 
     return false; 
    } 
    return true; 
} 
@Override 
public int hashCode() { 
    return this.station.hashCode() * 163 + this.phenomenun.hashCode(); 
} 
} 

NcdcJob.java

public class NcdcJob { 
public static void main(String[] args) throws Exception { 
    Configuration conf = new Configuration(); 
    Job job = new Job(conf); 
    job.setJarByClass(NcdcJob.class); 
    FileInputFormat.addInputPath(job, new Path("/user/hadoop/input")); 
    FileOutputFormat.setOutputPath(job, new Path("/user/hadoop/station")); 
    job.setMapperClass(Mapper.class); 
    job.setReducerClass(Reducer.class); 
    job.setMapOutputKeyClass(StationPhenomenun.class); 
    job.setMapOutputValueClass(IntWritable.class); 
    job.setOutputKeyClass(StationPhenomenun.class); 
    job.setOutputValueClass(IntWritable.class); 
    System.exit(job.waitForCompletion(true) ? 0 : 1); 
} 
} 

Кто-нибудь делал что-то подобное?

PS: Я пробовал это решение (Hadoop - composite key), но не работал для меня.

+0

Можете ли вы подробнее остановиться на '', но результаты неверны '' - каким образом? –

+0

Я отредактировал сообщение. Спасибо за внимание. – murilomsm

ответ

1

Просто проверьте, соответствуют ли следующие 2 класса вашей пользовательской реализации.

job.setMapperClass(Mapper.class); 
job.setReducerClass(Reducer.class); 

я смог получить желаемый результат с изменением следующих

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 

protected void reduce(StationPhenomenun key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 

также изменили имена классов MyMapper и MyReducer

722115,1,1,0,0,0,1 
722115,0,1,1,0,0,1 
722110,1,1,1,0,0,0 
722110,0,0,1,0,0,0 
722000,0,0,1,0,0,0 

Для этого входного набора, я мог бы получить следующий результат:

StationPhenomenun [station=722000, phenomenun=S] 1 
StationPhenomenun [station=722110, phenomenun=F] 1 
StationPhenomenun [station=722110, phenomenun=R] 1 
StationPhenomenun [station=722110, phenomenun=S] 2 
StationPhenomenun [station=722115, phenomenun=F] 1 
StationPhenomenun [station=722115, phenomenun=O] 2 
StationPhenomenun [station=722115, phenomenun=R] 2 
StationPhenomenun [station=722115, phenomenun=S] 1 

Вычисление такое же, вам просто нужно настроить, как выводится вывод.

+1

Я не могу поверить, что решение просто изменяет отображение карты и сводит методы: «org.apache.hadoop.mapreduce.Reducer.Context» в «Контекст». Теперь он работает! Большое спасибо. Я скоро вернусь. – murilomsm

+0

Продолжить @ murilomsm. Удачи. –

Смежные вопросы