Пустой РДД
Это не может быть заменен, когда RDD
пуст:
val rdd = sc.emptyRDD[Int]
rdd.reduce(_ + _)
// java.lang.UnsupportedOperationException: empty collection at
// org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$ ...
rdd.fold(0)(_ + _)
// Int = 0
Вы можете совместить reduce
с условием на isEmpty
, но это довольно уродливо.
Мутабельных буфера
Другого случай использования складки агрегация с изменяемым буфером. Рассмотрите следующие RDD:
import breeze.linalg.DenseVector
val rdd = sc.parallelize(Array.fill(100)(DenseVector(1)), 8)
Допустим, мы хотим получить сумму всех элементов. Наивное решение просто сократить с +
:
rdd.reduce(_ + _)
К сожалению, это создает новый вектор для каждого элемента. Поскольку создание объекта и последующая сборка мусора являются дорогостоящими, лучше использовать изменяемый объект. Это не возможно с reduce
(неизменность РДУ не подразумевает неизменность элементов), но может быть достигнуто с fold
следующим образом:
rdd.fold(DenseVector(0))((acc, x) => acc += x)
Нулевой элемент используется здесь в качестве изменяемого буфера инициализируется один раз в раздел оставляя фактическое данные нетронутые.
акк = оп (OBJ, акк), почему эта операция порядка используется вместо согл = оп (соотв, OBJ)
См SPARK-6416 и SPARK-7683
Спасибо за ваш ответ, но не могли бы вы подробнее рассказать о примере с изменяемым буфером? Есть ли аналогичный пример в PySpark? –
Поскольку 'zeroElement' создается каждый раз, когда вызывается' fold' и не является частью данных, он может быть безопасно мутирован. PySpark частично невосприимчив к возможным последствиям мутационных данных внутри RDD, поэтому трудно найти хороший пример Python. Однако это детализация, не являющаяся частью контракта. – zero323