2016-05-10 2 views
0

Этот код ниже должен найти Per-Key Average с помощью combineByKey():CombineBy Key Спарк Метод

val result = input.combineByKey(
(v) => (v, 1), 
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), 
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)). 
map{ case (key, value) => (key, value._1/value._2.toFloat) } 
result.collectAsMap().map(println(_)) 

Я смущен о выполнении описанного выше способа. Предположим, у нас есть набор данных
((1,1), (1,3), (2,4), (2,3), (3,1)).

Так исполнение combineByKey будет выглядеть примерно так ?:

1) Во-первых это создаст аккумулятор с (1,1).
2) Затем, когда он встречает кортеж с тем же ключом (1), он будет добавлять значения ключей вместе? Поэтому, когда он встречает (1,3), новый аккумулятор для ключа 1 будет выглядеть как (2,2). Так как он добавил ключи от (1,1) and (1,3), и так как есть два кортежа с ключом 1, он поместит 2 (с правой стороны) в (2,2).
3) Затем он будет продолжать делать это для всех тех же клавиш.
4) Затем в конце он возьмет все аккумуляторы от каждого из разделов и добавит ключи (левая сторона кортежа) и количество раз, когда это произошло (правая сторона кортежа), в один кортеж для каждого ключа.

Извините, если это немного, я все еще привык к функциональным методам программирования!

ответ

1

Как это часто бывает, большая ясность может быть получена при изучении типов метода и содержащего класс.

PairRDDFunctions[K, V]

def combineByKey[C]( createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]

У нас есть класс с 2 Params типа, ключ и значение, и метод с еще одной, объединитель.

Вас просят предоставить функции

  • превратить значение в объединитель
  • превратить значение и объединитель в объединитель
  • превратить объединитель и объединитель в объединитель

Сразу же это делает ваше описание добавления ключей вместе невозможным, поскольку мы не предоставляем никаких методов, которые работают с ключами.

Для каждого ключа:

  1. Сначала это создаст Combiner от значения, в этом случае, поставив значение в первый слот tuple2 с 1 во втором слоте (1, 1).
  2. Затем он объединяет каждое дополнительное значение для того же ключа в Combiner, добавляя значение в первый слот кортежа 2 и увеличивая второй слот. (1 + 3, 1 + 1) == (4, 2)
  3. Затем он будет продолжать делать это для всех записей для одного и того же ключа.
  4. Тогда в конце он возьмет все аккумуляторы от каждого из разделов и добавит значения (левая сторона кортежа) и количество раз, когда это произошло (правая сторона кортежа), в один кортеж для каждый ключ.

Ваше замешательство может быть связано с тем, что ваш ключ и стоимость одного типа. Ваш код будет скомпилирован, если вы изменили ключи на Strings, но не, если вы сделали это со значениями.

+0

О, вы указали именно то, что меня смутило, факт, что и ключ, и значение были цифрами! Большое спасибо за подробное объяснение, это имеет смысл сейчас! – LP496

+0

Также просто прояснить в самом конце, когда он «берет все аккумуляторы из каждого из разделов и добавляет значения (левая сторона кортежа) и количество раз, которое оно произошло (правая сторона кортежа), в один кортеж для каждого ключа ». Он знает, к какому ключу относится каждая пара значений ключей от разных разделов. Итак, скажем, из разделов 1 и 2 получаем (5,4) и (3,2) для ключа 1 соответственно. И (3,4) и (4,5) для ключа 2 соответственно. Так искра будет знать, чтобы сгруппировать ключ 1 кортеж вместе и ключ 2 кортежа вместе? – LP496

+1

Да, структура у вас есть в основном '(ключ: Int, (сумма: Int, count; Int))' –

Смежные вопросы