2016-06-13 2 views
2

Во время выполнения моей программы Spark иногда (причина для меня по-прежнему остается загадкой) пряжа убивает контейнеры (исполнители), давая сообщение о превышении предела памяти. Моя программа восстанавливается, хотя Spark повторно выполняет задачу, создавая новый контейнер. Однако в моей программе задача также создает некоторые промежуточные файлы на диске. Когда контейнер убит, эти файлы остаются позади. Есть ли способ, которым я могу поймать исполнителя, убитого как исключение, чтобы я мог удалить оставшиеся промежуточные файлы. Очевидно, что код обработки исключений также должен выполняться на том же узле, на котором выполнялся исполнитель, поэтому я могу удалить файлы оттуда.Есть ли способ поймать убитого исполнителя в Искры?

ответ

4

Добавление поверх @Taras Matyashovskyy ответ.

You can Use SparkListener and intercept SparkListener (Executor) events.

Ниже перечислены доступные события прослушивателя.

  • SparkListenerApplicationStart

  • SparkListenerJobStart

  • SparkListenerStageSubmitted

  • SparkListenerTaskStart

  • SparkListenerTaskGettingResult

  • SparkListenerTaskEnd

  • SparkListenerStageCompleted

  • SparkListenerJobEnd

  • SparkListenerApplicationEnd

  • SparkListenerEnvironmentUpdate

  • SparkListenerBlockManagerAdded

  • SparkListenerBlockManagerRemoved

  • SparkListenerBlockUpdated

  • SparkListenerUnpersistRDD

  • SparkListenerExecutorAdded

  • SparkListenerExecutorRemoved

Пример:HeartBeatReceiver.scala

/** 
* Lives in the driver to receive heartbeats from executors.. 
*/ 
private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) 
    extends SparkListener with ThreadSafeRpcEndpoint with Logging { 

    def this(sc: SparkContext) { 
    this(sc, new SystemClock) 
    } 

    sc.addSparkListener(this) ... 

Пожалуйста, посмотрите в к removed reason, который может вам подходит (я не пробовал)

+0

Я вижу. Но SparkListener должен выполняться на узле драйвера, поэтому как я могу убить файлы с узла, где исполнялся убитый исполнитель? – pythonic

+1

Программа драйвера должна прослушивать и принимать входящие соединения от своих исполнителей на протяжении всего срока ее службы (например, см. Spark.driver.port и spark.fileserver.port в разделе конфигурации сети). Таким образом, программа драйвера должна быть перенаправлена ​​по сети с рабочих узлов. http://spark.apache.org/docs/latest/cluster-overview.html –

+1

см. этот класс HeartbeatReceiver.scala в качестве примера https://github.com/apache/spark/blob/d5911d1173fe0872f21cae6c47abf8ff479345a4/core/src/ main/scala/org/apache/spark/HeartbeatReceiver.scala также обновлен в ответ –

5

В качестве опции вы можете попробовать использовать функциональность SparkListener. Итак, вы можете создать свой собственный класс и реализовать интерфейс SparkListener, чтобы подключаться к имеющимся событиям, которые совершенно не требуют пояснений. Затем вам нужно добавить этот пользовательский прослушиватель в SparkContext.

2 опции:

+0

спасибо за ответ. Но похоже, что SparkListener выполняется на узле драйвера. Итак, как я могу удалить файлы с узла, где исполнялся убитый исполнитель? – pythonic

+0

Это невозможно из коробки. Вам нужно перехватить событие и выполнить ping другой удаленный процесс, который будет отвечать за очистку. –