2015-09-28 3 views
6

При запуске sparkJob на кластере, прошедшем определенный размер данных (~ 2,5 гб), я получаю либо «Job отменен, потому что SparkContext был выключен», либо «проиграл исполнитель» ». Когда я смотрю на пряжу gui, я вижу, что работа, которая была убита, была успешной. Нет проблем при работе с данными размером 500 МБ. Я искал решение и обнаружил, что: - «кажется, что пряжа убивает некоторых исполнителей, поскольку они требуют больше памяти, чем ожидалось».«sparkContext был выключен» при запуске искры на большом наборе данных

Любые предложения по его отладке?

команда, которую я представляю свою искру работу с:

/opt/spark-1.5.0-bin-hadoop2.4/bin/spark-submit --driver-memory 22g --driver-cores 4 --num-executors 15 --executor-memory 6g --executor-cores 6 --class sparkTesting.Runner --master yarn-client myJar.jar jarArguments 

и sparkContext настройки

val sparkConf = (new SparkConf() 
    .set("spark.driver.maxResultSize", "21g") 
    .set("spark.akka.frameSize", "2011") 
    .set("spark.eventLog.enabled", "true") 
    .set("spark.eventLog.enabled", "true") 
    .set("spark.eventLog.dir", configVar.sparkLogDir) 
    ) 

упрощенный код, который не выглядит как тот

val hc = new org.apache.spark.sql.hive.HiveContext(sc) 
val broadcastParser = sc.broadcast(new Parser()) 

val featuresRdd = hc.sql("select "+ configVar.columnName + " from " + configVar.Table +" ORDER BY RAND() LIMIT " + configVar.Articles) 
val myRdd : org.apache.spark.rdd.RDD[String] = featuresRdd.map(doSomething(_,broadcastParser)) 

val allWords= featuresRdd 
    .flatMap(line => line.split(" ")) 
    .count 

val wordQuantiles= featuresRdd 
    .flatMap(line => line.split(" ")) 
    .map(word => (word, 1)) 
    .reduceByKey(_ + _) 
    .map(pair => (pair._2 , pair._2)) 
    .reduceByKey(_+_) 
    .sortBy(_._1) 
    .collect 
    .scanLeft((0,0.0)) ((res,add) => (add._1, res._2+add._2)) 
    .map(entry => (entry._1,entry._2/allWords)) 

val dictionary = featuresRdd 
    .flatMap(line => line.split(" ")) 
    .map(word => (word, 1)) 
    .reduceByKey(_ + _) // here I have Rdd of word,count tuples 
    .filter(_._2 >= moreThan) 
    .filter(_._2 <= lessThan) 
    .filter(_._1.trim!=("")) 
    .map(_._1) 
    .zipWithIndex 
    .collect 
    .toMap 

И ошибки стека

Exception in thread "main" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:703) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702) 
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) 
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:702) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1511) 
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) 
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1435) 
at org.apache.spark.SparkContext$$anonfun$stop$7.apply$mcV$sp(SparkContext.scala:1715) 
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185) 
at org.apache.spark.SparkContext.stop(SparkContext.scala:1714) 
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:146) 
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910) 
at org.apache.spark.rdd.RDD.count(RDD.scala:1121) 
at sparkTesting.InputGenerationAndDictionaryComputations$.createDictionary(InputGenerationAndDictionaryComputations.scala:50) 
at sparkTesting.Runner$.main(Runner.scala:133) 
at sparkTesting.Runner.main(Runner.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:483) 
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) 
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
+4

По моему опыту это почти всегда связано с исключениями OOM. Попробуйте просмотреть файлы журнала на машинах отдельных исполнителей. –

+1

Я бы напечатал stacktrace из вашей работы и контролировал размер JVM Heap с помощью некоторых инструментов утилиты Java: jstat, jstatd, jconsole ..., чтобы узнать больше об ограничении. Если у вас все еще есть физическая память, вы можете увеличить размер памяти JVM перед запуском приложения! Вы можете изменить размеры своих коллекций на основе оптимизированного размера кучи. –

ответ

4

Найдено ответов.

Моя таблица была сохранена как файл avro 20gb. Когда исполнители пытались его открыть. Каждый из них должен был загрузить 20 ГБ в память. Решил его использовать csv вместо avro

1

Симптомы типичны для ошибки OutOfMemory в одной задаче исполнителя. Попробуйте увеличить память для исполнителя при выполнении задания. См. Параметр -executor-memory saprk-submit, искровой оболочки и т. Д. Значение по умолчанию: 1G

1

Другая возможная причина ошибки «SparkContext shutdown» заключается в том, что вы импортируете файл jar после оценки какого-либо другого кода. (Это может произойти только в Ноутбуке Spark.)

Чтобы устранить проблему, переместите все свои операторы :cp myjar.jar в начало вашего файла.

Смежные вопросы