Если вы ожидаете только некоторых рангов вы могли бы сначала получить все distinct
значение, собирающие их как List
и превратить их в BroadCast
. Ниже я покажу, грязный пример, обратите внимание, что это не гарантирует, что выход будет отсортирован (там может быть, вероятно, лучше всего подходит, но это первое, что приходит на ум):
// Case 1. k is small (fits in the driver and nodes)
val rdd = sc.parallelize(List(1,1,44,4,1,33,44,1,2))
val distincts = rdd.distinct.collect.sortBy(x => x)
val broadcast = sc.broadcast(distincts)
val sdd = rdd.map{
case i: Int => (broadcast.value.asInstanceOf[Array[Int]].indexOf(i), i)
}
sdd.collect()
// Array[(Int, Int)] = Array((0,1), (0,1), (4,44), (2,4), (0,1), (3,33), (4,44), (0,1), (1,2))
В второй подход, который я сортирую, используя функциональность Spark, в RDD's documentation вы можете найти, как работают zipWithIndex
и .
//case 2. k is big, distinct values don't fit in the Driver.
val rdd = sc.parallelize(List(1,1,44,4,1,33,44,1,2))
val distincts = rdd.distinct.sortBy(x => x).zipWithIndex
rdd.keyBy(x => x)
.join(distincts.keyBy(_._1))
.map{
case (value: Int, (v1: Int, (v2: Int, index: Long))) => (index, value)
}.collect()
//res15: Array[(Long, Int)] = Array((3,33), (2,4), (0,1), (0,1), (0,1), (0,1), (4,44), (4,44), (1,2))
Кстати, я использую collect
только для целей визуализации в реальном приложении вы не должны использовать его, если вы не уверены, что помещается в памяти водителя.
Сколько различных результатов вы ожидаете? тысячи, миллионы? –
Имеются ли ранги, или вы ожидаете, что ранг будет получен из вида ввода? Комментарий о применении ранга к индексу делает это непонятным для меня – brycemcd
@AlbertoBonsanto, поэтому есть несколько случаев, ранжировать все или только топ 10 или 20. Мне нужно поддерживать все случаи. так что ответ - все миллионы. – happybayes