Я хочу использовать аккумулятор для сбора статистики о данных, которые я манипулирую на работе Spark. В идеале я бы это сделал, пока работа вычисляет необходимые преобразования, но поскольку Spark будет перекомпилировать задачи в разных случаях, аккумуляторы не будут отражать истинные показатели. Вот как документация описывает это:Когда аккумуляторы действительно надежны?
Для обновления аккумулятора, выполненное в действиях только Спарк гарантирует, что обновление каждой задачи к аккумулятору будет только применяется один раз, т.е. перезапуск задача не будет обновлять значение. В преобразованиях пользователи должны знать, что обновление каждой задачи может быть применено более одного раза, если задачи или этапы выполнения будут повторно выполнены.
Это сбивает с толку, так как большинство действия не позволит запускать пользовательский код (где можно использовать аккумуляторы), в большинстве случаев они принимают результаты предыдущих преобразований (лениво). В документации также показывает, что это:
val acc = sc.accumulator(0)
data.map(x => acc += x; f(x))
// Here, acc is still 0 because no actions have cause the `map` to be computed.
Но если мы добавим data.count()
в конце концов, будет ли это гарантированно правильно (нет дубликатов) или нет? Ясно, что acc
не используется «только внутри действий», так как карта является преобразованием. Поэтому это не должно быть гарантировано.
С другой стороны, обсуждение связанных билетов Jira говорит о «задачах результата», а не о «действиях». Например, here и here. Это, по-видимому, указывает на то, что результат действительно будет гарантированно правильным, поскольку мы используем acc
непосредственно перед действием и, следовательно, должны вычисляться как один этап.
Я предполагаю, что эта концепция «задачи результата» имеет отношение к типу задействованных операций, являясь последней, которая включает в себя действие, как в этом примере, которое показывает, как несколько операций делятся на этапы (пурпурный цвет, изображение, полученные от here):
Так гипотетический, а count()
действия в конце этой цепи будет частью той же самой конечной стадии, и я бы быть гарантирован, что аккумуляторы, используемые на на последней карте не будет никаких дубликатов?
Уточнение вокруг этой проблемы было бы здорово! Благодарю.
Ну, период награды закончился, и я до сих пор не знаю истинного ответа, поэтому до сих пор даю ему наивысший комментарий: -S –
data.count не будет запускать data.map (...), но это будет делать >>> val data2 = data.map (x => acc + = x; f (x)) >>> data2.count() –