2015-05-09 2 views
19

Я новичок в Spark и Scala. Я был смущен тем, как функция ReduceByKey работает в Spark. Предположим, что мы имеем следующий код:reduceByKey: Как это работает внутри?

val lines = sc.textFile("data.txt") 
val pairs = lines.map(s => (s, 1)) 
val counts = pairs.reduceByKey((a, b) => a + b) 

Функция карта ясна: s является ключевым и указывает на линии от data.txt и 1 это значение.

Однако, я не понял, как работает сокращениеByKey внутри? Указывает ли «а» на ключ? В качестве альтернативы, «a» указывает на «s»? Тогда что представляет собой + b? как они заполняются?

ответ

42

Давайте перейдем к дискретным методам и типам. Это обычно подвергает тонкости новых разработчиков:

pairs.reduceByKey((a, b) => a + b) 

становится

pairs.reduceByKey((a: Int, b: Int) => a + b) 

и переименование переменных делает его немного более явным

pairs.reduceByKey((accumulatedValue: Int, currentValue: Int) => accumulatedValue + currentValue) 

Итак, теперь мы можем видеть, что мы просто беря накопленное значение для данного ключа и суммируя его со следующим значением этого ключа. Теперь, давайте разложим его дальше, чтобы мы могли понять ключевую часть. Итак, давайте представим себе метод больше, как это:

pairs.reduce((accumulatedValue: List[(String, Int)], currentValue: (String, Int)) => { 
    //Turn the accumulated value into a true key->value mapping 
    val accumAsMap = accumulatedValue.toMap 
    //Try to get the key's current value if we've already encountered it 
    accumAsMap.get(currentValue._1) match { 
    //If we have encountered it, then add the new value to the existing value and overwrite the old 
    case Some(value : Int) => (accumAsMap + (currentValue._1 -> (value + currentValue._2))).toList 
    //If we have NOT encountered it, then simply add it to the list 
    case None => currentValue :: accumulatedValue 
    } 
}) 

Таким образом, вы можете увидеть, что уменьшить ByKey принимает шаблонного поиска ключа и отслеживания его так, что вам не придется беспокоиться об управлении, что часть.

Глубже, вернее, если вы хотите

Все, что было сказано, что это упрощенная версия того, что происходит, поскольку есть некоторые оптимизации, которые сделаны здесь. Эта операция ассоциативна, поэтому искровой двигатель сначала выполняет эти сокращения локально (часто называемый снижением на стороне карты), а затем снова у водителя.Это экономит сетевой трафик; вместо того, чтобы отправлять все данные и выполнять операцию, он может уменьшить ее как можно меньше, а затем отправить это сокращение по проводу.

3

В вашем примере

val counts = pairs.reduceByKey((a,b) => a+b) 

a и b являются Int аккумуляторы для _2 кортежей в pairs. reduceKey возьмет два кортежа с тем же значением s и использует их значения _2 как a и b, производя новый Tuple[String,Int]. Эта операция повторяется до тех пор, пока не будет только один кортеж для каждой клавиши s.

В отличие от не- Спарк (или, на самом деле, не параллельно) reduceByKey, где первый элемент всегда является аккумулятором и второе значение, reduceByKey работает в распределенном режиме, т.е. каждый узел уменьшит это множество кортежей в набор с уникальным ключом кортежей, а затем уменьшаем кортежи с нескольких узлов до тех пор, пока не будет окончательный с уникальным ключом набор кортежей. Это означает, что при уменьшении результатов от узлов a и b представляют собой уже уменьшенные аккумуляторы.

19

Одним из требований к функции reduceByKey является то, что должно быть ассоциативным. Для того, чтобы построить некоторую интуицию о том, как reduceByKey работает, давайте сначала посмотрим, как ассоциативная функция ассоциативно помогает нам в параллельных вычислений:

associative function in action

Как мы можем видеть, мы можем разбить оригинальную коллекцию на части и путем применения ассоциативной функции, мы можем накопить итог. Последовательный случай тривиален, мы привыкли к нему: 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10.

Ассоциативность позволяет использовать эту же функцию последовательно и параллельно. reduceByKey использует это свойство для вычисления результата из RDD, который является распределенной коллекцией, состоящей из разделов.

Рассмотрим следующий пример:

// collection of the form ("key",1),("key,2),...,("key",20) split among 4 partitions 
val rdd =sparkContext.parallelize(((1 to 20).map(x=>("key",x))), 4) 
rdd.reduceByKey(_ + _) 
rdd.collect() 
> Array[(String, Int)] = Array((key,210)) 

В искрой, данные распределены на разделы. Для следующей иллюстрации (4) разделы расположены слева, заключенные в тонкие линии. Во-первых, мы применяем функцию локально к каждому разделу, последовательно в разделе, но мы запускаем все 4 раздела параллельно. Затем результат каждого локального вычисления агрегируется путем повторного использования той же функции и, наконец, результат.

enter image description here

reduceByKey является специализация aggregateByKeyaggregateByKey занимает 2 функции: один, который применяется к каждому разделу (последовательно), и один, который применяется среди результатов каждого раздела (параллельно). reduceByKey использует одну и ту же ассоциативную функцию в обоих случаях: выполнять последовательные вычисления на каждом разделе и затем комбинировать эти результаты с конечным результатом, как мы здесь проиллюстрировали.

Смежные вопросы