Использование Spark 1.1Spark - «слишком много открытых файлов» в случайном порядке
У меня есть 2 набора данных. Один из них очень большой, а другой был уменьшен (с использованием фильтрации 1: 100) до гораздо меньшего масштаба. Мне нужно уменьшить большой набор данных до того же масштаба, объединив только те элементы из меньшего списка с соответствующими аналогами в более крупном списке (эти списки содержат элементы, имеющие взаимное поле объединения).
Я делаю это с помощью следующего кода:
- "если (joinKeys! = NULL)" часть соответствующая часть
Меньший список "joinKeys", больше список является «keyedEvents "
private static JavaRDD<ObjectNode> createOutputType(JavaRDD jsonsList, final String type, String outputPath,JavaPairRDD<String,String> joinKeys) { outputPath = outputPath + "/" + type; JavaRDD events = jsonsList.filter(new TypeFilter(type)); // This is in case we need to narrow the list to match some other list of ids... Recommendation List, for example... :) if(joinKeys != null) { JavaPairRDD<String,ObjectNode> keyedEvents = events.mapToPair(new KeyAdder("requestId")); JavaRDD <ObjectNode> joinedEvents = joinKeys.join(keyedEvents).values().map(new PairToSecond()); events = joinedEvents; } JavaPairRDD<String,Iterable<ObjectNode>> groupedEvents = events.mapToPair(new KeyAdder("sliceKey")).groupByKey(); // Add convert jsons to strings and add "\n" at the end of each JavaPairRDD<String, String> groupedStrings = groupedEvents.mapToPair(new JsonsToStrings()); groupedStrings.saveAsHadoopFile(outputPath, String.class, String.class, KeyBasedMultipleTextOutputFormat.class); return events; }
Вещь при выполнении этой работы, я всегда получаю ту же ошибку:
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2757 in stage 13.0 failed 4 times, most recent failure: Lost task 2757.3 in stage 13.0 (TID 47681, hadoop-w-175.c.taboola-qa-01.internal): java.io.FileNotFoundException: /hadoop/spark/tmp/spark-local-20141201184944-ba09/36/shuffle_6_2757_2762 (Too many open files)
java.io.FileOutputStream.open(Native Method)
java.io.FileOutputStream.<init>(FileOutputStream.java:221)
org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192)
org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67)
org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
я уже увеличил свои ulimits, выполнив следующие действия на всех кластерных системах:
echo "* soft nofile 900000" >> /etc/security/limits.conf
echo "root soft nofile 900000" >> /etc/security/limits.conf
echo "* hard nofile 990000" >> /etc/security/limits.conf
echo "root hard nofile 990000" >> /etc/security/limits.conf
echo "session required pam_limits.so" >> /etc/pam.d/common-session
echo "session required pam_limits.so" >> /etc/pam.d/common-session-noninteractive
Но не исправить мою проблему ...
После увеличения ограничений дескриптора файла вы также перезапустили все искровые демоны (и, возможно, начали новый сеанс на узле, на котором вы используете свой драйвер), чтобы новые лимиты были подняты? Также вы используете искробезопасную изоляцию или искру на YARN? Если вы используете YARN, перезапуск всех демонов YARN также может быть полезным (по той же причине). –
Я использую GCE, поэтому я каждый раз развертываю новый кластер. Кроме того, настройка ulimits выполняется в фазе инициализации кластера до запуска задания. Наконец, я не использую YARN, а скорее режим Spark Standalone. –
Поднятие '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' '' /1.1.0/configuration.html#shuffle-behavior). –