Я запускаю код Mapreduce для кластера Hadoop Multi-Node Cluster (2.4.1). Я получаю ошибку превышения верхнего предела GC, когда я пытаюсь запустить с 2 входными файлами размером 200 МБ и 200 МБ. Это работает отлично и получает правильный вывод, когда я использую очень маленькие файлы.
Моя цель - сравнить каждую запись потока в 1-м файле с каждой записью потока во втором файле и вычислить расстояние, затем взять 10 максимальных значений и вывести их на редуктор на основе этих 10 макс. значения.
Пример записи потока в обоих файлах - 194.144.0.27 | 192.168.1.5 | 0.0.0.0 | 0 | 0 | 2 | 104 | 1410985350 | 1410985350 | 51915 | 51413 | 6 | 6
Mapreduce в Hadoop дает превышение верхнего предела GC при использовании файлов более 200 МБ
несколько снимков: http://goo.gl/5tUhJJ и http://goo.gl/lh1Qvm
Вот Mapper Класс:
Mapper Класс:
public class mapper extends Mapper<LongWritable, Text, Text, IntWritable>
{
private final static IntWritable five = new IntWritable(5);
private Text counter1;
ArrayList<String> lines = new ArrayList<String>();
String str;
BufferedReader br,in;
int ddos_line = 0;
int normal_line = 0,total_testing_records=4000;
int K = 10;
@Override
protected void setup(Context context) throws IOException, InterruptedException
{
//BufferedReader in = new BufferedReader(new FileReader("normal"));
Configuration conf = context.getConfiguration();
URI[] cachefiles = context.getCacheFiles();
FileSystem fs = FileSystem.get(new Configuration());
FileStatus[] status = fs.listStatus(new Path(cachefiles[0].toString()));
BufferedReader in=new BufferedReader(new InputStreamReader(fs.open(status[0].getPath())));
while((str = in.readLine()) != null)
{
lines.add(str);
}
in.close();
//System.out.println("na netti");
}
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line1 = value.toString();
ddos_line++;
normal_line = 0;
double[] count = {-1, -1, -1, -1, -1, -1, -1, -1, -1, -1};
int[] lineIndex = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
String[] parts = line1.split("\\|");
String[] linesArray = lines.toArray(new String[lines.size()]);
boolean bool = true;
int t1=0;
double sum=0;
while (bool)
{
for(int i=0; i<K;i++)
{
if(bool==false) break;
sum = 0;
String[] parts2 = linesArray[normal_line].split("\\|");
for(int x=0;x<13;x++)
{
if(parts[x].equals(parts2[x]))
{
t1 = 1;
}
else t1 = 0;
sum += t1;
}
sum = Math.sqrt(sum);
if(count[K-1] <= sum)
{
count[K-1] = sum;
lineIndex[K-1]=normal_line;
}
for(int k=0;k<K;k++)
{
for(int j=0;j<K-1;j++)
{
if(count[j] < count[j+1])
{
double temp2 = count[j+1];
count[j+1] = count[j];
count[j] = temp2;
int temp3 = lineIndex[j+1];
lineIndex[j+1] = lineIndex[j];
lineIndex[j] = temp3;
}
}
}
//System.out.println(ddos_line + " " + normal_line);
if (normal_line + 1 < linesArray.length)
{
normal_line++;
continue;
}
else bool = false;
}
} // while end
char[] t = {'d','d','d','d','d','d','d','d','d','d'};
for(int i=0;i<K;i++)
{
if(lineIndex[i] <= total_testing_records/2) t[i] = 'n';
}
int counter_normal=0, counter_ddos=0;
for(int i=0;i<K;i++)
{
if(t[i]=='n')
counter_normal++;
else
counter_ddos++;
//System.out.println("t[i]: "+t[i]+", counter: "+counter_ddos);
}
if(counter_normal<=K/2)
{
counter1 = new Text(ddos_line + " : d : "+ counter_ddos);
}
else
{
counter1 = new Text(ddos_line + " : n : "+ (counter_normal));
}
context.write(counter1, five);
//System.out.println("mapper finished");
}
public void run(Context context) throws IOException, InterruptedException
{
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
}
Какова конфигурация вашего кластера? @ kishorer747 –
@ SachinJanani mine? –
жаль, что это было для Kishorer747, не видели вас, ответили –