2014-10-22 2 views
0

Я запускаю искровой кластер с 50 машинами. Каждая машина представляет собой виртуальную машину с 8-ядерными и 50-Гбайт-памятью (41, похоже, доступна для Spark).Проблемы с памятью при работе Spark на относительно большом входе

Я работаю на нескольких папках ввода, я оцениваю размер ввода, чтобы быть ~ 250GB gz сжатым.

Хотя мне кажется, что количество и конфигурация машин я использую, кажется, достаточно, примерно через 40 минут выполнения задания потерпеть неудачу, я могу увидеть следующие ошибки в журналах:

2558733 [Result resolver thread-2] WARN org.apache.spark.scheduler.TaskSetManager - Lost task 345.0 in stage 1.0 (TID 345, hadoop-w-3.c.taboola-qa-01.internal): java.lang.OutOfMemoryError: Java heap space 

java.lang.StringCoding$StringDecoder.decode(StringCoding.java:149) 
java.lang.StringCoding.decode(StringCoding.java:193) 
java.lang.String.<init>(String.java:416) 
java.lang.String.<init>(String.java:481) 
com.doit.customer.dataconverter.Phase0$3.call(Phase0.java:699) 
com.doit.customer.dataconverter.Phase0$3.call(Phase0.java:660) 
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:164) 
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:164) 
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) 
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) 
org.apache.spark.rdd.RDD.iterator(RDD.scala:229) 
org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) 
org.apache.spark.rdd.RDD.iterator(RDD.scala:229) 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
org.apache.spark.scheduler.Task.run(Task.scala:54) 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
java.lang.Thread.run(Thread.java:745) 

а также:

2653545 [Result resolver thread-2] WARN org.apache.spark.scheduler.TaskSetManager - Lost task 122.1 in stage 1.0 (TID 392, hadoop-w-22.c.taboola-qa-01.internal): java.lang.OutOfMemoryError: GC overhead limit exceeded 

java.lang.StringCoding$StringDecoder.decode(StringCoding.java:149) 
java.lang.StringCoding.decode(StringCoding.java:193) 
java.lang.String.<init>(String.java:416) 
java.lang.String.<init>(String.java:481) 
com.doit.customer.dataconverter.Phase0$3.call(Phase0.java:699) 
com.doit.customer.dataconverter.Phase0$3.call(Phase0.java:660) 
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:164) 
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:164) 
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) 
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) 
org.apache.spark.rdd.RDD.iterator(RDD.scala:229) 
org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) 
org.apache.spark.rdd.RDD.iterator(RDD.scala:229) 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
org.apache.spark.scheduler.Task.run(Task.scala:54) 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
java.lang.Thread.run(Thread.java:745) 

Как я могу отладить такую ​​проблему?

EDIT: Я нашел причину проблемы. Именно этот кусок кода:

private static final int MAX_FILE_SIZE = 40194304; 
    .... 
    .... 
     JavaPairRDD<String, List<String>> typedData = filePaths.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, List<String>>() { 
      @Override 
      public Iterable<Tuple2<String, List<String>>> call(Iterator<String> filesIterator) throws Exception { 
       List<Tuple2<String, List<String>>> res = new ArrayList<>(); 
       String fileType = null; 
       List<String> linesList = null; 
       if (filesIterator != null) { 
        while (filesIterator.hasNext()) { 
         try { 
          Path file = new Path(filesIterator.next()); 
          // filter non-trc files 
          if (!file.getName().startsWith("1")) { 
           continue; 
          } 
          fileType = getType(file.getName()); 
          Configuration conf = new Configuration(); 
          CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf); 
          CompressionCodec codec = compressionCodecs.getCodec(file); 
          FileSystem fs = file.getFileSystem(conf); 
          ContentSummary contentSummary = fs.getContentSummary(file); 
          long fileSize = contentSummary.getLength(); 
          InputStream in = fs.open(file); 
          if (codec != null) { 
           in = codec.createInputStream(in); 
          } else { 
           throw new IOException(); 
          } 

          byte[] buffer = new byte[MAX_FILE_SIZE]; 

          BufferedInputStream bis = new BufferedInputStream(in, BUFFER_SIZE); 
          int count = 0; 
          int bytesRead = 0; 
          try { 
           while ((bytesRead = bis.read(buffer, count, BUFFER_SIZE)) != -1) { 
            count += bytesRead; 
           } 
          } catch (Exception e) { 
           log.error("Error reading file: " + file.getName() + ", trying to read " + BUFFER_SIZE + " bytes at offset: " + count); 
           throw e; 
          } 

          Iterable<String> lines = Splitter.on("\n").split(new String(buffer, "UTF-8").trim()); 
          linesList = Lists.newArrayList(lines); 

          // get rid of first line in file 

          Iterator<String> it = linesList.iterator(); 
          if (it.hasNext()) { 
           it.next(); 
           it.remove(); 
          } 
          //res.add(new Tuple2<>(fileType,linesList)); 
         } finally { 
          res.add(new Tuple2<>(fileType, linesList)); 
         } 


        } 

       } 
       return res; 
      } 

В частности выделения буфера размером 40М для каждого файла, чтобы прочитать содержимое файла с помощью BufferedInputStream. Это приводит к тому, что память стека заканчивается в какой-то момент.

Дело в том:

  • Если я читаю построчно (который не требует буфера), это будет очень неэффективное чтения
  • Если я выделяю один буфер и повторно использовать его для каждый прочитанный файл - возможно ли это в смысле параллелизма? Или он получит , переписанный несколькими темами?

Любые предложения приветствуются ...

EDIT 2: Фиксированный первая проблема памяти, перемещая байты выделения массива вне итератора, поэтому он получает повторно все элементы раздела. Но есть еще новый String (буфер, «UTF-8»). Trim()), который создается для цели разделения - это объект, который также создается каждый раз. Я мог бы использовать stringbuffer/builder, но тогда как бы установить кодировку charset без объекта String?

+0

Кажется, что вы читаете содержимое всех входных файлов в массив ArrayList в памяти? Этот вид побеждает цель работы с RDD/разделами, или я что-то упускаю? –

ответ

1

В конце концов я изменил код следующим образом:

 // Transform list of files to list of all files' content in lines grouped by type 
     JavaPairRDD<String,List<String>> typedData = filePaths.mapToPair(new PairFunction<String, String, List<String>>() { 
      @Override 
      public Tuple2<String, List<String>> call(String filePath) throws Exception { 
       Tuple2<String, List<String>> tuple = null; 
       try { 
        String fileType = null; 
        List<String> linesList = new ArrayList<String>(); 
        Configuration conf = new Configuration(); 
        CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf); 
        Path path = new Path(filePath); 
        fileType = getType(path.getName()); 
        tuple = new Tuple2<String, List<String>>(fileType, linesList); 

        // filter non-trc files 
        if (!path.getName().startsWith("1")) { 
         return tuple; 
        } 

        CompressionCodec codec = compressionCodecs.getCodec(path); 
        FileSystem fs = path.getFileSystem(conf); 
        InputStream in = fs.open(path); 
        if (codec != null) { 
         in = codec.createInputStream(in); 
        } else { 
         throw new IOException(); 
        } 

        BufferedReader r = new BufferedReader(new InputStreamReader(in, "UTF-8"), BUFFER_SIZE); 
        // Get rid of the first line in the file 
        r.readLine(); 

        // Read all lines 
        String line; 
        while ((line = r.readLine()) != null) { 
         linesList.add(line); 
        } 
       } catch (IOException e) { // Filtering of files whose reading went wrong 
        log.error("Reading of the file " + filePath + " went wrong: " + e.getMessage()); 
       } finally { 
        return tuple; 
       } 
      } 

     }); 

Так что теперь я не использовать буфер размером 40м, а построить список линий динамически с помощью списка массива. Это решило мою текущую проблему памяти, но теперь у меня появились другие странные ошибки, которые не срабатывали. Сообщите об этом в другом вопросе ...

Смежные вопросы