Немного изменены ваши входные данные.
val scores = sc.parallelize(Array(
("a", 1),
("a", 2),
("a", 3),
("b", 3),
("b", 1),
("a", 4),
("b", 4),
("b", 2),
("a", 6),
("b", 8)
))
Я объясню, как это сделать шаг за шагом:
1.Group ключом для создания массива
scores.groupByKey().foreach(println)
Результат:
(b,CompactBuffer(3, 1, 4, 2, 8))
(a,CompactBuffer(1, 2, 3, 4, 6))
Как вы видите, каждый само значение представляет собой массив чисел. CompactBuffer - это просто оптимизированный массив.
2.Для каждый ключ, реверс сортировки списка чисел, значение содержит
scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse)}).foreach(println)
Результат:
(b,List(8, 4, 3, 2, 1))
(a,List(6, 4, 3, 2, 1))
3.Keep только первые 2 элемента из 2-й стадии, они будут сверху 2 баллов в списке
scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)}).foreach(println)
Результат:
(a,List(6, 4))
(b,List(8, 4))
4.Flat карта для создания новых спаренных RDD для каждого ключа и наивысший балл
scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)}).flatMap({case (k, numbers) => numbers.map(k -> _)}).foreach(println)
Результат:
(b,8)
(b,4)
(a,6)
(a,4)
5.Optional шаг - сортировка по ключу, если вы хотите
scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)}).flatMap({case (k, numbers) => numbers.map(k -> _)}).sortByKey(false).foreach(println)
Результаты:
(a,6)
(a,4)
(b,8)
(b,4)
Надеюсь, это объяснение помогло понять логику.
Это не работает?Это результат: Array [(String, (Int, Int))] = Array ((a, (4,4)), (b, (4,4))) –
У меня это получилось, адаптировав ответ user52045 : знач баллов = sc.parallelize (Array ( ("а", 1), ("а", 2), ("а", 3), ("б", 3), ("б", 1), ("а", 4), ("б", 4), ("б", 2) )) scores.mapValues (р => (р, р)) .reduceByKey ((u, v) => { значения val = список (u._1, u._2, v._1, v._2) .sorted (упорядочение [Int] .reverse) .distinct (значения (0), значения (1)) }). Collect() –
@michael_erasmus Вы правы, что в моем коде есть ошибка. Thx для его исправления. Одна вещь, которую вы должны быть осторожны, потому что, если все элементы списка совпадают, вы получите outOfBoudException. – abalcerek