2014-12-01 3 views
2

Использование Spark 1.1Spark - «слишком много открытых файлов» в случайном порядке

У меня есть 2 набора данных. Один из них очень большой, а другой был уменьшен (с использованием фильтрации 1: 100) до гораздо меньшего масштаба. Мне нужно уменьшить большой набор данных до того же масштаба, объединив только те элементы из меньшего списка с соответствующими аналогами в более крупном списке (эти списки содержат элементы, имеющие взаимное поле объединения).

Я делаю это с помощью следующего кода:

  • "если (joinKeys! = NULL)" часть соответствующая часть
  • Меньший список "joinKeys", больше список является «keyedEvents "

    private static JavaRDD<ObjectNode> createOutputType(JavaRDD jsonsList, final String type, String outputPath,JavaPairRDD<String,String> joinKeys) { 
    
    outputPath = outputPath + "/" + type; 
    
    JavaRDD events = jsonsList.filter(new TypeFilter(type)); 
    
    
    // This is in case we need to narrow the list to match some other list of ids... Recommendation List, for example... :) 
    if(joinKeys != null) { 
        JavaPairRDD<String,ObjectNode> keyedEvents = events.mapToPair(new KeyAdder("requestId")); 
    
        JavaRDD <ObjectNode> joinedEvents = joinKeys.join(keyedEvents).values().map(new PairToSecond()); 
    
        events = joinedEvents; 
    } 
    
    
    JavaPairRDD<String,Iterable<ObjectNode>> groupedEvents = events.mapToPair(new KeyAdder("sliceKey")).groupByKey(); 
    // Add convert jsons to strings and add "\n" at the end of each 
    JavaPairRDD<String, String> groupedStrings = groupedEvents.mapToPair(new JsonsToStrings()); 
    groupedStrings.saveAsHadoopFile(outputPath, String.class, String.class, KeyBasedMultipleTextOutputFormat.class); 
    return events; 
    } 
    

Вещь при выполнении этой работы, я всегда получаю ту же ошибку:

Exception in thread "main" java.lang.reflect.InvocationTargetException 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40) 
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) 
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2757 in stage 13.0 failed 4 times, most recent failure: Lost task 2757.3 in stage 13.0 (TID 47681, hadoop-w-175.c.taboola-qa-01.internal): java.io.FileNotFoundException: /hadoop/spark/tmp/spark-local-20141201184944-ba09/36/shuffle_6_2757_2762 (Too many open files) 
    java.io.FileOutputStream.open(Native Method) 
    java.io.FileOutputStream.<init>(FileOutputStream.java:221) 
    org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) 
    org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192) 
    org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67) 
    org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65) 
    scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) 
    org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) 
    org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    org.apache.spark.scheduler.Task.run(Task.scala:54) 
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    java.lang.Thread.run(Thread.java:745) 

я уже увеличил свои ulimits, выполнив следующие действия на всех кластерных системах:

echo "* soft nofile 900000" >> /etc/security/limits.conf 
echo "root soft nofile 900000" >> /etc/security/limits.conf 
echo "* hard nofile 990000" >> /etc/security/limits.conf 
echo "root hard nofile 990000" >> /etc/security/limits.conf 
echo "session required pam_limits.so" >> /etc/pam.d/common-session 
echo "session required pam_limits.so" >> /etc/pam.d/common-session-noninteractive 

Но не исправить мою проблему ...

+0

После увеличения ограничений дескриптора файла вы также перезапустили все искровые демоны (и, возможно, начали новый сеанс на узле, на котором вы используете свой драйвер), чтобы новые лимиты были подняты? Также вы используете искробезопасную изоляцию или искру на YARN? Если вы используете YARN, перезапуск всех демонов YARN также может быть полезным (по той же причине). –

+0

Я использую GCE, поэтому я каждый раз развертываю новый кластер. Кроме того, настройка ulimits выполняется в фазе инициализации кластера до запуска задания. Наконец, я не использую YARN, а скорее режим Spark Standalone. –

+0

Поднятие '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' /1.1.0/configuration.html#shuffle-behavior). –

ответ

2

Структура bdutil работает в пути пользователя «hadoop» - это тот, кто управляет работой. Сценарий, который развертывает кластер, создал файл /etc/security/limits.d/hadoop.conf, который переопределил настройки ulimit для пользователя «hadoop», о котором я не знал. Удалив этот файл или, альтернативно, установив нужные ulimits, я смог решить проблему.

Смежные вопросы