2015-07-21 4 views
7

, когда я пытаюсь запустить его на этой папке он бросает мне ExecutorLostFailure каждый разExecutorLostFailure Ошибка при выполнении задачи в Спарк

Привет Я новичок в Спарк. Я пытаюсь запустить работу на Spark 1.4.1 с 8 подчиненными узлами с 11,7 ГБ памяти на каждом диске объемом 3,2 ГБ. Я запускаю задачу Spark из одного из ведомых узлов (из 8 узлов) (поэтому с 0,7 долями хранения около 4,8 Гб доступно только на каждом узле) и с использованием Mesos в качестве менеджера кластеров. Я использую эту конфигурацию:

spark.master mesos://uc1f-bioinfocloud-vamp-m-1:5050 
spark.eventLog.enabled true 
spark.driver.memory 6g 
spark.storage.memoryFraction 0.7 
spark.core.connection.ack.wait.timeout 800 
spark.akka.frameSize 50 
spark.rdd.compress true 

Я пытаюсь запустить Спарк MLlib Наивного алгоритма Байеса на папке около 14 Гбайтов данных. (Нет проблемы при запуске задачи в папке с 6 ГБ). Я читаю эту папку из хранилища Google как RDD и даю 32 в качестве параметра раздела. (Я также попытался увеличить раздел). Затем с помощью TF создайте вектор признаков и предскажите на основе этого. Но когда я пытаюсь запустить его в этой папке, он бросает меня ExecutorLostFailure каждый раз. Я пробовал разные конфигурации, но ничего не помогает. Может быть, мне не хватает чего-то очень простого, но не могу понять. Любая помощь или предложение будут очень ценными.

Log является:

15/07/21 01:18:20 ERROR TaskSetManager: Task 3 in stage 2.0 failed 4 times; aborting job  
15/07/21 01:18:20 INFO TaskSchedulerImpl: Cancelling stage 2  
15/07/21 01:18:20 INFO TaskSchedulerImpl: Stage 2 was cancelled  
15/07/21 01:18:20 INFO DAGScheduler: ResultStage 2 (collect at /opt/work/V2ProcessRecords.py:213) failed in 28.966 s  
15/07/21 01:18:20 INFO DAGScheduler: Executor lost: 20150526-135628-3255597322-5050-1304-S8 (epoch 3)  
15/07/21 01:18:20 INFO BlockManagerMasterEndpoint: Trying to remove executor 20150526-135628-3255597322-5050-1304-S8 from BlockManagerMaster.  
15/07/21 01:18:20 INFO DAGScheduler: Job 2 failed: collect at /opt/work/V2ProcessRecords.py:213, took 29.013646 s  
Traceback (most recent call last):  
    File "/opt/work/V2ProcessRecords.py", line 213, in <module> 
    secondPassRDD = firstPassRDD.map(lambda (name, title, idval, pmcId, pubDate, article, tags , author, ifSigmaCust, wclass): (str(name), title, idval, pmcId, pubDate, article, tags , author, ifSigmaCust , "Yes" if ("PMC" + pmcId) in rddNIHGrant else ("No") , wclass)).collect()  
    File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 745, in collect  
    File "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__  
    File "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 2.0 failed 4 times, most recent failure: Lost task 3.3 in stage 2.0 (TID 12, vamp-m-2.c.quantum-854.internal): ExecutorLostFailure (executor 20150526-135628-3255597322-5050-1304-S8 lost)  
Driver stacktrace:  
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) 
     at  org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) 
     at  scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at  org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) 
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 

15/07/21 01:18:20 INFO BlockManagerMaster: Removed 20150526-135628-3255597322-5050-1304-S8 successfully in removeExecutor 
15/07/21 01:18:20 INFO DAGScheduler: Host added was in lost list earlier:vamp-m-2.c.quantum-854.internal 
Jul 21, 2015 1:01:15 AM INFO: parquet.hadoop.ParquetFileReader: Initiating action with parallelism: 5 
15/07/21 01:18:20 INFO SparkContext: Invoking stop() from shutdown hook 



{"Event":"SparkListenerTaskStart","Stage ID":2,"Stage Attempt ID":0,"Task Info":{"Task ID":11,"Index":6,"Attempt":2,"Launch Time":1437616381852,"Executor ID":"20150526-135628-3255597322-5050-1304-S8","Host":"uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Accumulables":[]}} 

{ "Событие": "SparkListenerExecutorRemoved", "Отметка": 1437616389696, "Палач ID": "20150526-135628-3255597322-5050-1304-S8", "Удален Причина ":« Потерянный исполнитель »} {« Событие »:« SparkListenerTaskEnd »,« Идентификатор этапа »: 2,« Идентификатор сцены »: 0,« Тип задачи »:« ResultTask »,« Причина завершения задачи »: {" «Причина»: «ExecutorLostFailure», «Executor ID»: «20150526-135628-3255597322-5050-1304-S8»}, «Информация о задаче»: {«Идентификатор задачи»: 11, «Индекс»: 6, «Попытка»: 2, «Время запуска»: 1437616381852, «Идентификатор исполнителя»: «20150526-135628-3255597322-5050-1304-S8», «Хост»: «uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854 .Internal "," Locality ":" PROCESS_LOCAL "," Спекулятивный ": false," Получение времени результата ": 0," F inish Time ": 1437616389697,« Failed »: true,« Accumulables »: []}} {« Событие »:« SparkListenerExecutorAdded »,« Timestamp »: 1437616389707,« Идентификатор исполнителя »:« 20150526-135628-3255597322-5050- 1304-S8 »,« Информация об исполнителе »: {« Хост »:« uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal »,« Total Cores »: 1,« Log Urls »: { }}} {«Событие»: «SparkListenerTaskStart», «Идентификатор этапа»: 2, «Идентификатор этапа»: 0, «Информация о задаче»: {«Идентификатор задачи»: 12, «Индекс»: 6, «Попытка» : 3, «Время запуска»: 1437616389702, «Идентификатор исполнителя»: «20150526-135628-3255597322-5050-1304-S8», «Host»: «uc1f-bioinfocloud-vamp-m-2.c.quantum-device- 854.internal "," Locality ":" PROCESS_LOCAL "," Спекулятивный ": false," Получение времени результата ": 0," Finish Time ": 0," Failed ": false," Accumulables ": []}} { «Событие»: «SparkListenerExecutorRemoved», «Timestamp»: 1437616397743, «Идентификатор исполнителя»: «20150526-135628-3255597322-5050-1304-S8», «Убрано Причина»: «Потерянный исполнитель»} {«Событие»: SparkListenerTaskEnd "" Идентификатор этапа ": 2,« Идентификатор сцены »: 0,« Тип задачи »:« Результат »,« Причина завершения задачи »: {« Причина »:« ExecutorLostFailure »,« Идентификатор исполнителя »:« 20150526-135628-3255597322- 5050-1304-S8 "},« Информация о задаче »: {« Идентификатор задачи »: 12,« Индекс »: 6,« Попытка »: 3,« Время запуска »: 1437616389702,« Идентификатор исполнителя »:« 20150526-135628- 3255597322-5050-1304-S8" , "Хост": "uc1f-bioinfocloud-вамп-м-2.c.quantum-устройства-854.internal", "Местность": "PROCESS_LOCAL", "Спекулятивный" ложь» «Время выполнения»: 0, «Время окончания»: 1437616397743, «Сбой»: true, «Accumulables»: []}} {«Событие»: «SparkListenerStageCompleted», «Информация о шаге»: {«Идентификатор этапа»: 2 , «Идентификатор сцены»: 0, «Название сцены»: «собирать в /opt/work/V2ProcessRecords.py:215», «Количество задач»: 72, «Информация о RDD»: [{«ID RDD»: 6 , «Имя»: «PythonRDD», «Parent ID»: [0], «Уровень хранения»: {«Использовать диск»: false, «Использовать память»: false, «Использовать ExternalBlockStore»: false, «Deserialized»: false , «Репликация»: 1}, «Количество разделов»: 72, «Количество кэшированных разделов»: 0, «Размер памяти»: 0, «Размер внешнего блока блокировки»: 0, «Размер диска»: 0}, {«RDD ID ": 0," Имя ":" гс: // uc1f-bioinfocloud-вамп-м/литература/XML/P */*.nxml "," Scope ":" {\ "id \": \ "0 \", \ "name \": \ "wholeTextFiles \"} "," Parent IDs ": []," Storage Level ": {" Используйте «Диск»: false, «Использовать память»: false, «Использовать ExternalBlockStore»: false, «Deserialized»: false, «Replication»: 1}, «Number of Partitions»: 72, «Number of Cached Partitions»: 0, «Размер памяти»: 0, «ExternalBlockStore Size»: 0, «Размер диска»: 0}], «Родительские идентификаторы»: [], «Подробности»: «», «Время отправки»: 1437616365566, «Время завершения»: 1437616397753, «Причина отказа»: «Работа прерывается из-за срыва этапа: Задача 6 на этапе 2.0 провалилась 4 раза, последний сбой: Потерянная задача 6.3 на этапе 2.0 (TID 12, uc1f-bioinfocloud-vamp-m-2.c. квантовое устройство-854.внутреннее): ExecutorLostFailure (исполнитель 20150526-135628-3255597322-5050-1304-S8 потерян) \ nDriver stacktrace: "," Accumulables ": []}} {" Событие ":" SparkListenerJobEnd "," Идентификатор работы ": 2,« Время завершения »: 1437616397755,« Результат работы »: {« Результат »:« Работа с ошибкой »,« Исключение »: {« Сообщение »:« Работа прерывается из-за срыва этапа: Задача 6 на этапе 2.0 не выполнена 4 раза, последний сбой: Потерянная задача 6.3 i n ступень 2.0 (TID 12, uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.внутренний): ExecutorLostFailure (исполнитель 20150526-135628-3255597322-5050-1304-S8 потерян) \ nDriver stacktrace: ", «Трассировка стека»: [{«Объявление класса»: «org.apache.spark.scheduler.DAGScheduler», «Имя метода»: «org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages», «Имя файла»: DAGScheduler.scala "," Номер строки ": 1266}, {" Объявление класса ":" org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1 "," Имя метода ":" apply "," File Name " : «DAGScheduler.scala», «Номер строки»: 1257}, {«Объявление класса»: «org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1», «Имя метода»: «применить», «Файл «Имя»: «DAGScheduler.scala», «Номер строки»: 1256}, {«Объявление класса»: «scala.collection.mutable.ResizableArray $ class», «Имя метода»: «foreach», «Имя файла»: ResizableArray.scala "," Номер строки ": 59}, {" Объявление класса ":" scala.collection.mutable.ArrayBuffer "," Имя метода ":" foreach "," Имя файла ":" ArrayBuffer.scala "," Номер строки ": 47}, {" Объявление класса ":" org.apache.spark.scheduler.DAGScheduler "," Имя метода ":" abortStage "," Имя файла ":" DAGScheduler.scala "," Номер строки ": 1256}, {" Объявление класса ":" org .apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1 "," Имя метода ":« применить »,« Имя файла »:« DAGScheduler.scala »,« Номер строки »: 730}, {« Объявление класса »: «org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1», «Имя метода»: «применить», «Имя файла»: «DAGScheduler.scala», «Номер строки»: 730}, {«Объявление класса msgstr "" ":". «DAGScheduler», «Имя метода»: «handleTaskSetFailed», «Имя файла»: «DAGScheduler.scala», «Номер строки»: 730}, {«Объявление класса»: «org.apache.spark.scheduler.DAGSchedulerEventProcessLoop», «Имя метода»: «onReceive», «Имя файла»: «DAGScheduler.scala», «Номер строки»: 1450}, {«Объявление класса»: «org.apache.spark.scheduler.DAGSchedulerEventProcessLoop», «Имя метода», : "onReceive", "Имя файла": "DAGScheduler.scala" , «Номер строки»: 1411}, {«Объявление класса»: «org.apache.spark.util.EventLoop $$ anon $ 1», «Имя метода»: «запустить», «Имя файла»: «EventLoop.scala», , «Номер строки»: 48}]}}}

+0

Искры версии 1.4.1 – User17

ответ

2

Трудно сказать, в чем проблема, если нет журнала неудавшегося исполнителя, а не драйвера, но, скорее всего, это проблема с памятью. Постарайтесь значительно увеличить номер раздела (если ваш ток равен 32 попытке 200)

+0

Я пробовал его с 200 разделами, но он не срабатывает даже тогда. Даже с 800 разделами с некоторыми другими настройками конфигурации. Но – User17

+0

Но у меня такая же проблема. Задача потеряна 4 раза, чем ExecutorLostFailure. Иногда я получаю время соединения. Кроме того, поскольку я нахожусь в Google Cloud Mesos Cluster, я попробовал и искал журналы, как вы предложили, и посмотрел на var/log/mesos (ведущие и ведомые журналы как в/var/log/mesos по умолчанию, так и предложены в документации по искровому мезосу), но Я не нашел никакой хорошей информации. Есть ли другой журнал, который я могу посмотреть или разместить здесь? С журналами исполнителей вы имели в виду то же самое? – User17

3

Эта ошибка возникает из-за сбоя задачи более четырех раз. Попробуйте увеличить параллелизм в кластере, используя следующий параметр.

--conf "spark.default.parallelism=100" 

Установите значение параллелизма в 2 - 3 раза количество ядер, доступных на вашем кластере. Если это не сработает. попробуйте увеличить параллельность экспоненциальным образом. т. е. если ваш текущий параллелизм не работает, умножьте его на два и так далее. Также я заметил, что это помогает, если ваш уровень параллелизма является простым числом, особенно если вы используете groupByKkey.

+0

Увеличение номера раздела не помогает моему делу. Но установка номера параллельности помогает. – kennyut

2

У меня была эта проблема, и проблема для меня была очень высокой частотой одного ключа в задаче reduceByKey. Это было (я думаю), в результате чего массивный список собирался на одного из исполнителей, который затем выдавал ошибки OOM.

Решение для меня состояло в том, чтобы просто отфильтровать ключи с высокой численностью населения, прежде чем делать reduceByKey, но я ценю, что это может быть или не быть возможно в зависимости от вашего приложения. В любом случае, мне не нужны все мои данные.

1

Самая распространенная причина ExecutorLostFailure в соответствии с моим пониманием - это OOM в исполнителе.

Чтобы решить проблему OOM, необходимо выяснить, что именно вызывает ее. Простое увеличение параллелизма по умолчанию или увеличение памяти исполнителя не является стратегическим решением.

Если вы посмотрите на то, что увеличивает параллелизм, он пытается создать больше исполнителей, чтобы каждый исполнитель мог работать с меньшими и меньшими данными. Но если ваши данные искажены так, что ключ, на котором происходит разбиение данных (для параллелизма), имеет больше данных, просто увеличение параллелизма не будет иметь никакого эффекта.

Точно так же, увеличивая память Executor, это будет очень неэффективным способом передачи такого сценария, как если бы только один исполнитель терпел неудачу с ExecutorLostFailure, запрос на увеличение памяти для всех исполнителей заставит ваше приложение требовать гораздо больше памяти, чем ожидалось.