Давайте перейдем к дискретным методам и типам. Это обычно подвергает тонкости новых разработчиков:
pairs.reduceByKey((a, b) => a + b)
становится
pairs.reduceByKey((a: Int, b: Int) => a + b)
и переименование переменных делает его немного более явным
pairs.reduceByKey((accumulatedValue: Int, currentValue: Int) => accumulatedValue + currentValue)
Итак, теперь мы можем видеть, что мы просто беря накопленное значение для данного ключа и суммируя его со следующим значением этого ключа. Теперь, давайте разложим его дальше, чтобы мы могли понять ключевую часть. Итак, давайте представим себе метод больше, как это:
pairs.reduce((accumulatedValue: List[(String, Int)], currentValue: (String, Int)) => {
//Turn the accumulated value into a true key->value mapping
val accumAsMap = accumulatedValue.toMap
//Try to get the key's current value if we've already encountered it
accumAsMap.get(currentValue._1) match {
//If we have encountered it, then add the new value to the existing value and overwrite the old
case Some(value : Int) => (accumAsMap + (currentValue._1 -> (value + currentValue._2))).toList
//If we have NOT encountered it, then simply add it to the list
case None => currentValue :: accumulatedValue
}
})
Таким образом, вы можете увидеть, что уменьшить ByKey принимает шаблонного поиска ключа и отслеживания его так, что вам не придется беспокоиться об управлении, что часть.
Глубже, вернее, если вы хотите
Все, что было сказано, что это упрощенная версия того, что происходит, поскольку есть некоторые оптимизации, которые сделаны здесь. Эта операция ассоциативна, поэтому искровой двигатель сначала выполняет эти сокращения локально (часто называемый снижением на стороне карты), а затем снова у водителя.Это экономит сетевой трафик; вместо того, чтобы отправлять все данные и выполнять операцию, он может уменьшить ее как можно меньше, а затем отправить это сокращение по проводу.