2014-11-27 3 views
0

Я запускаю код Mapreduce для кластера Hadoop Multi-Node Cluster (2.4.1). Я получаю ошибку превышения верхнего предела GC, когда я пытаюсь запустить с 2 входными файлами размером 200 МБ и 200 МБ. Это работает отлично и получает правильный вывод, когда я использую очень маленькие файлы.
Моя цель - сравнить каждую запись потока в 1-м файле с каждой записью потока во втором файле и вычислить расстояние, затем взять 10 максимальных значений и вывести их на редуктор на основе этих 10 макс. значения.

Пример записи потока в обоих файлах - 194.144.0.27 | 192.168.1.5 | 0.0.0.0 | 0 | 0 | 2 | 104 | 1410985350 | 1410985350 | 51915 | 51413 | 6 | 6
Mapreduce в Hadoop дает превышение верхнего предела GC при использовании файлов более 200 МБ

несколько снимков: http://goo.gl/5tUhJJ и http://goo.gl/lh1Qvm

Вот Mapper Класс:

Mapper Класс:

public class mapper extends Mapper<LongWritable, Text, Text, IntWritable> 
{ 

private final static IntWritable five = new IntWritable(5); 

private Text counter1; 

ArrayList<String> lines = new ArrayList<String>(); 
String str; 
BufferedReader br,in; 
int ddos_line = 0; 
int normal_line = 0,total_testing_records=4000; 
int K = 10; 

    @Override 
    protected void setup(Context context) throws IOException, InterruptedException 
    { 
    //BufferedReader in = new BufferedReader(new FileReader("normal")); 

     Configuration conf = context.getConfiguration();   
     URI[] cachefiles = context.getCacheFiles(); 

     FileSystem fs = FileSystem.get(new Configuration());   
     FileStatus[] status = fs.listStatus(new Path(cachefiles[0].toString()));    
     BufferedReader in=new BufferedReader(new InputStreamReader(fs.open(status[0].getPath()))); 


     while((str = in.readLine()) != null) 
     { 
      lines.add(str); 
     } 
     in.close(); 
     //System.out.println("na netti"); 
    } 

@Override 
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
{ 

    String line1 = value.toString(); 
    ddos_line++; 
    normal_line = 0; 

    double[] count = {-1, -1, -1, -1, -1, -1, -1, -1, -1, -1}; 
    int[] lineIndex = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; 

    String[] parts = line1.split("\\|"); 
    String[] linesArray = lines.toArray(new String[lines.size()]); 

    boolean bool = true; 
    int t1=0; 
    double sum=0; 
    while (bool) 
    { 
     for(int i=0; i<K;i++) 
     { 
       if(bool==false) break; 
       sum = 0; 
       String[] parts2 = linesArray[normal_line].split("\\|"); 

       for(int x=0;x<13;x++) 
        { 
          if(parts[x].equals(parts2[x])) 
          { 
           t1 = 1; 
          } 
          else t1 = 0; 

          sum += t1; 
        } 

        sum = Math.sqrt(sum); 

        if(count[K-1] <= sum) 
        { 
         count[K-1] = sum; 
         lineIndex[K-1]=normal_line; 
        } 



        for(int k=0;k<K;k++) 
        { 
         for(int j=0;j<K-1;j++) 
         { 
          if(count[j] < count[j+1]) 
          { 
           double temp2 = count[j+1]; 
           count[j+1] = count[j]; 
           count[j] = temp2; 

           int temp3 = lineIndex[j+1]; 
           lineIndex[j+1] = lineIndex[j]; 
           lineIndex[j] = temp3; 
          } 
         } 
        } 

       //System.out.println(ddos_line + " " + normal_line); 
       if (normal_line + 1 < linesArray.length) 
       { 
        normal_line++; 
        continue; 
       } 

       else bool = false; 
      } 


    } // while end 

    char[] t = {'d','d','d','d','d','d','d','d','d','d'}; 
    for(int i=0;i<K;i++) 
    { 
     if(lineIndex[i] <= total_testing_records/2) t[i] = 'n'; 
    } 

    int counter_normal=0, counter_ddos=0; 
    for(int i=0;i<K;i++) 
    { 
     if(t[i]=='n') 
      counter_normal++; 
     else 
      counter_ddos++; 
     //System.out.println("t[i]: "+t[i]+", counter: "+counter_ddos); 

    } 

    if(counter_normal<=K/2) 
    { 
     counter1 = new Text(ddos_line + " : d : "+ counter_ddos); 
    } 
    else 
    { 
     counter1 = new Text(ddos_line + " : n : "+ (counter_normal)); 
    } 



    context.write(counter1, five); 

    //System.out.println("mapper finished");  
} 
    public void run(Context context) throws IOException, InterruptedException 
    { 
     setup(context); 
     while (context.nextKeyValue()) { 
      map(context.getCurrentKey(), context.getCurrentValue(), context); 
     } 
     cleanup(context); 
    } 
} 

ответ

1

Просто увеличить память ваших задач, то:

Набор

mapred.child.java.opts 

в конфигурации работы в

-Xmx1024m 

или более, что вы должны прочитать этот файл и обработать его ,

+0

Какова конфигурация вашего кластера? @ kishorer747 –

+0

@ SachinJanani mine? –

+0

жаль, что это было для Kishorer747, не видели вас, ответили –

Смежные вопросы