2015-04-07 2 views
40

Я хочу использовать аккумулятор для сбора статистики о данных, которые я манипулирую на работе Spark. В идеале я бы это сделал, пока работа вычисляет необходимые преобразования, но поскольку Spark будет перекомпилировать задачи в разных случаях, аккумуляторы не будут отражать истинные показатели. Вот как документация описывает это:Когда аккумуляторы действительно надежны?

Для обновления аккумулятора, выполненное в действиях только Спарк гарантирует, что обновление каждой задачи к аккумулятору будет только применяется один раз, т.е. перезапуск задача не будет обновлять значение. В преобразованиях пользователи должны знать, что обновление каждой задачи может быть применено более одного раза, если задачи или этапы выполнения будут повторно выполнены.

Это сбивает с толку, так как большинство действия не позволит запускать пользовательский код (где можно использовать аккумуляторы), в большинстве случаев они принимают результаты предыдущих преобразований (лениво). В документации также показывает, что это:

val acc = sc.accumulator(0) 
data.map(x => acc += x; f(x)) 
// Here, acc is still 0 because no actions have cause the `map` to be computed. 

Но если мы добавим data.count() в конце концов, будет ли это гарантированно правильно (нет дубликатов) или нет? Ясно, что acc не используется «только внутри действий», так как карта является преобразованием. Поэтому это не должно быть гарантировано.

С другой стороны, обсуждение связанных билетов Jira говорит о «задачах результата», а не о «действиях». Например, here и here. Это, по-видимому, указывает на то, что результат действительно будет гарантированно правильным, поскольку мы используем acc непосредственно перед действием и, следовательно, должны вычисляться как один этап.

Я предполагаю, что эта концепция «задачи результата» имеет отношение к типу задействованных операций, являясь последней, которая включает в себя действие, как в этом примере, которое показывает, как несколько операций делятся на этапы (пурпурный цвет, изображение, полученные от here):

A job dividing several operations into multiple purple stages

Так гипотетический, а count() действия в конце этой цепи будет частью той же самой конечной стадии, и я бы быть гарантирован, что аккумуляторы, используемые на на последней карте не будет никаких дубликатов?

Уточнение вокруг этой проблемы было бы здорово! Благодарю.

+0

Ну, период награды закончился, и я до сих пор не знаю истинного ответа, поэтому до сих пор даю ему наивысший комментарий: -S –

+0

data.count не будет запускать data.map (...), но это будет делать >>> val data2 = data.map (x => acc + = x; f (x)) >>> data2.count() –

ответ

16

Чтобы ответить на вопрос: «Когда аккумуляторы действительно надежны?"

Ответ:.. Когда они присутствуют в действий операции

Согласно документации в задаче действий, даже если какие-либо перезапущен задачи присутствуют она обновит Накопитель только один раз

Для обновлений аккумуляторов, выполняемых только внутри действий, Spark гарантирует, что обновление каждой задачи к аккумулятору будет применяться только один раз, то есть перезапущенные задачи не будут обновлять значение. В преобразованиях пользователи должны знать, что обновление каждой задачи может быть применено болеечем один раз, если задачи или этапы работы будут повторно выполнены.

И действие разрешает запускать собственный код.

Для примера.

val accNotEmpty = sc.accumulator(0) 
ip.foreach(x=>{ 
    if(x!=""){ 
    accNotEmpty += 1 
    } 
}) 

Но почему Карта + Действие именно. Результат Задача Операции: Не работает Аккумулятор?

  1. Задание выполнено из-за какого-либо исключения в коде. Spark попробует 4 раза (количество попыток по умолчанию). Если задача терпит неудачу каждый раз, когда она даст исключение. Если случайно это удастся, то Spark продолжит работу и просто обновит значение аккумулятора для успешного состояния, а состояния накопителей с отключенными состояниями будут проигнорированы.
    Вердикт: надлежащим образом обработано
  2. Сбой этапа: если сбой исполнительного узла, отсутствие сбоя в работе пользователя, но аппаратный сбой - и если узел опускается в режиме тасования. Когда выход в случайном порядке сохраняется локально, если узел опускается, это Вышел Shuffle. Итак, Spark возвращается на сцену, которая сгенерировала выход в случайном порядке, смотрит, какие задачи нужно перезапустить, и выполняет их на одном из узлов, которые все еще живы. После того, как мы регенерируем отсутствующий вывод в случайном порядке, этап который сгенерировал вывод карты, выполнил некоторые из его задач несколько раз. Spark подсчитывает обновления аккумулятора от всех них.
    Вердикт: не обрабатывается в задаче результата. Счетчик выдаст неверный результат.
  3. Если задача выполняется медленно, Spark может запустить спекулятивную копию этой задачи на другом узле.
    Вердикт: не обрабатывается. Счетчик выдаст неправильный результат.
  4. RDD, который кэширован, огромен и не может находиться в памяти. Поэтому, когда используется RDD, он повторно запускает операцию Map, чтобы получить RDD, и снова аккумулятор будет обновляться им.
    Вердикт: не обрабатывается. Счетчик выдаст неправильный результат.

Так может случиться, что одна и та же функция может работать несколько раз на одних и тех же данных. Так что Spark не предоставляет никаких гарантий для обновления аккумулятора из-за операции с Картой.

Так что лучше использовать Accumulator in Action в Spark.

Чтобы узнать больше о аккумуляторе и его проблемах, обратитесь к этому Blog Post - Imran Rashid.

+0

Привет! Вы в основном цитируете то же самое, что и в самом вопросе, и ответ Даниила Даравоса, но я не думаю, что это целая картина, учитывая, что документация, похоже, противоречит самому себе и другому экспертному анализу из cloudera. Вы разработчик кода искры или дизайна или просто выходите извне, как и все остальные? –

+0

@ DanielL. : Нет. Я просто пользователь Spark. Но они работали над этим довольно долгое время. На самом деле я хотел добавить часть foreach() для ответа, поскольку действие может быть выполнено как пользовательское. Так что в будущем, если какой-либо OP приходит, они могут хорошо понять Accumulator. –

+0

Конечно, но я думаю, вы ошибаетесь (и документы неясны). На самом деле, мое нынешнее понимание заключается в том, что аккумулятор действительно надежен в «задаче результата», а не в действии. Например, если вы просто выполняете 'sc.readFile (...). Map (... <использование аккумулятора здесь> ...). SaveAsFile (...)' каждая задача будет вычисляться только один раз, а аккумуляторы будут так как вся операция будет выполняться как единая единица при ленивой оценке, без промежуточных результатов, которые могут быть повторены (спекулятивно или нет). Мой опыт отражает это до сих пор, поэтому я искал авторитетный ответ. –

1

Я думаю, Матей ответил на этот вопрос в упомянутой документации:

Как обсуждалось на https://github.com/apache/spark/pull/2524 это довольно трудно обеспечить хорошую семантику в общем случае (обновления аккумулятора внутри без результирующих стадий), для следующие причины:

  • РРУ могут быть вычислены как часть нескольких этапов. Например, если вы обновили накопитель внутри MappedRDD, а затем , перетасовывайте его, это может быть один этап. Но если вы затем снова назовете map() на MappedRDD и перетасовываете результат, вы получаете второй этап , где эта карта является конвейером.Вы хотите считать это обновление аккумулятором дважды или нет?

  • Целые этапы могут быть повторно, если перетасовать файлы будут удалены с помощью периодического очистителя или потеряны из-за сбоя в узлов, поэтому все, что треки РДУ должны были бы сделать это в течение длительных периодов времени (до тех пор, RDD ссылается в программе пользователя ), что было бы довольно сложно реализовать.

Так что я собираюсь отметить это как «не исправит» на данный момент, для части за результат этапы, выполненные в СПАРК-3628, за исключением.

+1

Я читал это. Но определение того, что представляет собой «результат», не ясен. Я использовал этот текст для составления вопроса и вывел альтернативы, но у меня нет возможности проверить правильность допущений, выводов. Я надеялся, что кто-то, кто понимает процесс, проверит его. (и, возможно, объясните немного дальше). Спасибо хоть! –

14

Обновления аккумуляторов отправляются обратно к драйверу, когда задача успешно завершена. Таким образом, результаты вашего аккумулятора гарантируются, если вы уверены, что каждая задача будет выполнена ровно один раз, и каждая задача будет выполнена так, как вы ожидали.

Я предпочитаю полагаться на reduce и aggregate вместо аккумуляторов, потому что довольно сложно перечислить все способы выполнения задач.

  • Действие начинает задачи.
  • Если действие зависит от более раннего этапа, и результаты этого этапа не будут (полностью) кэшированы, тогда будут запущены задачи с более ранней стадии.
  • Спекулятивное выполнение запускает повторяющиеся задачи при обнаружении небольшого количества медленных задач.

При этом существует много простых случаев, когда аккумуляторы могут полностью доверять.

val acc = sc.accumulator(0) 
val rdd = sc.parallelize(1 to 10, 2) 
val accumulating = rdd.map { x => acc += 1; x } 
accumulating.count 
assert(acc == 10) 

Будет ли это быть гарантированно правильно (нет дубликатов)?

Да, если спекулятивное исполнение отключено. map и count будут одноэтапными, так как вы говорите, что задача не может быть успешно выполнена более одного раза.

Но аккумулятор обновляется как побочный эффект. Поэтому вы должны быть очень осторожны, думая о том, как будет выполняться код. Рассмотрим это вместо accumulating.count:

// Same setup as before. 
accumulating.mapPartitions(p => Iterator(p.next)).collect 
assert(acc == 2) 

Это также создаст одну задачу для каждого раздела, и каждая задача будет гарантированно выполнить только один раз. Но код в map не будет выполнен на всех элементах, только первый в каждом разделе.

Аккумулятор подобен глобальной переменной. Если вы разделяете ссылку на RDD, которая может наращивать аккумулятор, тогда другой код (другие потоки) может также привести к его увеличению.

// Same setup as before. 
val x = new X(accumulating) // We don't know what X does. 
          // It may trigger the calculation 
          // any number of times. 
accumulating.count 
assert(acc >= 10) 
+0

Я понимаю эти случаи. Но как тогда вы объясняете официальные документы: «Для обновлений аккумуляторов, выполняемых только внутри действий, Spark гарантирует, что обновление каждой задачи к аккумулятору будет применяться только один раз, то есть перезапущенные задачи не будут обновлять значение« ??? Разумеется, они должны быть надежными в НЕКОТОРЫХ случаях, учитывая это? Я не думаю, что «никогда» не является ответом, он, вероятно, более сложный, чем этот ... –

+2

Вы правы. Я просто протестировал его, и обновления накопителей из неудавшихся задач не учитываются. Тогда я полагаю, что если у вас отключено спекулятивное выполнение, аккумуляторы могут быть заслуживающими доверия, если вы уверены, что нет ничего, что могло бы вызвать вычисление более одного раза. Я более подробно рассмотрю код. Возможно, мне придется пересмотреть свое глубокое недоверие к аккумуляторам :). –

+0

Вы и я оба! ;-) Мой взгляд на код оставил мне больше вопросов, чем ответов, но дайте мне знать, если вы сделаете какой-то вывод. Ты получаешь щедрость, если хочешь. –

Смежные вопросы