Во время выполнения моей программы Spark иногда (причина для меня по-прежнему остается загадкой) пряжа убивает контейнеры (исполнители), давая сообщение о превышении предела памяти. Моя программа восстанавливается, хотя Spark повторно выполняет задачу, создавая новый контейнер. Однако в моей программе задача также создает некоторые промежуточные файлы на диске. Когда контейнер убит, эти файлы остаются позади. Есть ли способ, которым я могу поймать исполнителя, убитого как исключение, чтобы я мог удалить оставшиеся промежуточные файлы. Очевидно, что код обработки исключений также должен выполняться на том же узле, на котором выполнялся исполнитель, поэтому я могу удалить файлы оттуда.Есть ли способ поймать убитого исполнителя в Искры?
ответ
Добавление поверх @Taras Matyashovskyy ответ.
You can Use SparkListener and intercept SparkListener (Executor) events.
Ниже перечислены доступные события прослушивателя.
SparkListenerApplicationStart
SparkListenerJobStart
SparkListenerStageSubmitted
SparkListenerTaskStart
SparkListenerTaskGettingResult
SparkListenerTaskEnd
SparkListenerStageCompleted
SparkListenerJobEnd
SparkListenerApplicationEnd
SparkListenerEnvironmentUpdate
SparkListenerBlockManagerAdded
SparkListenerBlockManagerRemoved
SparkListenerBlockUpdated
SparkListenerUnpersistRDD
SparkListenerExecutorAdded
SparkListenerExecutorRemoved
Пример:HeartBeatReceiver.scala
/**
* Lives in the driver to receive heartbeats from executors..
*/
private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
extends SparkListener with ThreadSafeRpcEndpoint with Logging {
def this(sc: SparkContext) {
this(sc, new SystemClock)
}
sc.addSparkListener(this) ...
Пожалуйста, посмотрите в к removed reason, который может вам подходит (я не пробовал)
В качестве опции вы можете попробовать использовать функциональность SparkListener
. Итак, вы можете создать свой собственный класс и реализовать интерфейс SparkListener
, чтобы подключаться к имеющимся событиям, которые совершенно не требуют пояснений. Затем вам нужно добавить этот пользовательский прослушиватель в SparkContext
.
2 опции:
SparkContext.addSparkListener(<your custom listener>)
- Via
spark.extraListeners
недвижимость, подробнее здесь http://spark.apache.org/docs/latest/configuration.html#available-properties
спасибо за ответ. Но похоже, что SparkListener выполняется на узле драйвера. Итак, как я могу удалить файлы с узла, где исполнялся убитый исполнитель? – pythonic
Это невозможно из коробки. Вам нужно перехватить событие и выполнить ping другой удаленный процесс, который будет отвечать за очистку. –
Я вижу. Но SparkListener должен выполняться на узле драйвера, поэтому как я могу убить файлы с узла, где исполнялся убитый исполнитель? – pythonic
Программа драйвера должна прослушивать и принимать входящие соединения от своих исполнителей на протяжении всего срока ее службы (например, см. Spark.driver.port и spark.fileserver.port в разделе конфигурации сети). Таким образом, программа драйвера должна быть перенаправлена по сети с рабочих узлов. http://spark.apache.org/docs/latest/cluster-overview.html –
см. этот класс HeartbeatReceiver.scala в качестве примера https://github.com/apache/spark/blob/d5911d1173fe0872f21cae6c47abf8ff479345a4/core/src/ main/scala/org/apache/spark/HeartbeatReceiver.scala также обновлен в ответ –