2015-05-11 3 views
11

Say У меня есть PairRDD как таковой (Очевидно, гораздо больше данных, в реальной жизни, предположим, что миллионы записей):Spark: Получить Top N по ключевым

val scores = sc.parallelize(Array(
     ("a", 1), 
     ("a", 2), 
     ("a", 3), 
     ("b", 3), 
     ("b", 1), 
     ("a", 4), 
     ("b", 4), 
     ("b", 2) 
)) 

Что является наиболее эффективным способом для создания RDD с верхние 2 счета за ключ?

val top2ByKey = ... 
res3: Array[(String, Int)] = Array((a,4), (a,3), (b,4), (b,3)) 

ответ

10

Я думаю, что это должно быть весьма эффективным:

Отредактировано согласно комментариям OP:

scores.mapValues(p => (p, p)).reduceByKey((u, v) => { 
    val values = List(u._1, u._2, v._1, v._2).sorted(Ordering[Int].reverse).distinct 
    if (values.size > 1) (values(0), values(1)) 
    else (values(0), values(0)) 
}).collect().foreach(println) 
+0

Это не работает?Это результат: Array [(String, (Int, Int))] = Array ((a, (4,4)), (b, (4,4))) –

+1

У меня это получилось, адаптировав ответ user52045 : знач баллов = sc.parallelize (Array ( ("а", 1), ("а", 2), ("а", 3), ("б", 3), ("б", 1), ("а", 4), ("б", 4), ("б", 2) )) scores.mapValues ​​(р => (р, р)) .reduceByKey ((u, v) => { значения val = список (u._1, u._2, v._1, v._2) .sorted (упорядочение [Int] .reverse) .distinct (значения (0), значения (1)) }). Collect() –

+1

@michael_erasmus Вы правы, что в моем коде есть ошибка. Thx для его исправления. Одна вещь, которую вы должны быть осторожны, потому что, если все элементы списка совпадают, вы получите outOfBoudException. – abalcerek

0
scores.reduceByKey(_ + _).map(x => x._2 -> x._1).sortByKey(false).map(x => x._2 -> x._1).take(2).foreach(println) 
+3

Привет, добро пожаловать в переполнение стека. Пожалуйста, не просто код дампа в качестве вашего ответа. Объясните свой ход мыслей, чтобы мы могли лучше понять это. Прочтите это, если у вас есть сомнения: http://stackoverflow.com/help/how-to-answer Спасибо. – Cthulhu

+1

Я верю, что scores.reduceByKey (_ + _) приведет к срыву всех пар с одним и тем же ключом, чтобы в итоге вы получили один (a, N) и один (b, M), где N и M были суммой всех значения и значения b соответственно. В этот момент вы получите только один (a, N) и отсутствие декомпозиции (a, i) и (a, j), где i и j - два самых высоких значения для всех пар. –

2

Немного изменены ваши входные данные.

val scores = sc.parallelize(Array(
     ("a", 1), 
     ("a", 2), 
     ("a", 3), 
     ("b", 3), 
     ("b", 1), 
     ("a", 4), 
     ("b", 4), 
     ("b", 2), 
     ("a", 6), 
     ("b", 8) 
    )) 

Я объясню, как это сделать шаг за шагом:

1.Group ключом для создания массива

scores.groupByKey().foreach(println) 

Результат:

(b,CompactBuffer(3, 1, 4, 2, 8)) 
(a,CompactBuffer(1, 2, 3, 4, 6)) 

Как вы видите, каждый само значение представляет собой массив чисел. CompactBuffer - это просто оптимизированный массив.

2.Для каждый ключ, реверс сортировки списка чисел, значение содержит

scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse)}).foreach(println) 

Результат:

(b,List(8, 4, 3, 2, 1)) 
(a,List(6, 4, 3, 2, 1)) 

3.Keep только первые 2 элемента из 2-й стадии, они будут сверху 2 баллов в списке

scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)}).foreach(println) 

Результат:

(a,List(6, 4)) 
(b,List(8, 4)) 

4.Flat карта для создания новых спаренных RDD для каждого ключа и наивысший балл

scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)}).flatMap({case (k, numbers) => numbers.map(k -> _)}).foreach(println) 

Результат:

(b,8) 
(b,4) 
(a,6) 
(a,4) 

5.Optional шаг - сортировка по ключу, если вы хотите

scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)}).flatMap({case (k, numbers) => numbers.map(k -> _)}).sortByKey(false).foreach(println) 

Результаты:

(a,6) 
(a,4) 
(b,8) 
(b,4) 

Надеюсь, это объяснение помогло понять логику.

Смежные вопросы