2016-04-28 2 views
0

Я хотел сделать рейтинг в искру, следующим образом:Сортировка и ранжирование в искровой скале apache?

Вход:

5.6 
5.6 
5.6 
6.2 
8.1 
5.5 
5.5 

Звания:

1 
1 
1 
2 
3 
0 
0 
0 

Выход:

Rank Input 
0  5.5 
0  5.5 
1  5.6 
1  5.6 
1  5.6 
2  6.2 
3  8.1 

Я хотел знать, как я могу сортировать их в искры, а также получить тот же рейтинг, что и выше. Требования:

  1. рейтинга начинается с 0 не 1
  2. это случай образца для миллионов записей и одного раздела может быть очень большим - я ценю рекомендации о том, как ранжировать с использованием внутреннего метода сортировки

Я хотел сделать это в scala. Может кто-нибудь помочь мне написать код для этого?

+0

Сколько различных результатов вы ожидаете? тысячи, миллионы? –

+0

Имеются ли ранги, или вы ожидаете, что ранг будет получен из вида ввода? Комментарий о применении ранга к индексу делает это непонятным для меня – brycemcd

+0

@AlbertoBonsanto, поэтому есть несколько случаев, ранжировать все или только топ 10 или 20. Мне нужно поддерживать все случаи. так что ответ - все миллионы. – happybayes

ответ

2

Если вы ожидаете только некоторых рангов вы могли бы сначала получить все distinct значение, собирающие их как List и превратить их в BroadCast. Ниже я покажу, грязный пример, обратите внимание, что это не гарантирует, что выход будет отсортирован (там может быть, вероятно, лучше всего подходит, но это первое, что приходит на ум):

// Case 1. k is small (fits in the driver and nodes) 
val rdd = sc.parallelize(List(1,1,44,4,1,33,44,1,2)) 
val distincts = rdd.distinct.collect.sortBy(x => x) 
val broadcast = sc.broadcast(distincts) 

val sdd = rdd.map{ 
    case i: Int => (broadcast.value.asInstanceOf[Array[Int]].indexOf(i), i) 
} 

sdd.collect() 

// Array[(Int, Int)] = Array((0,1), (0,1), (4,44), (2,4), (0,1), (3,33), (4,44), (0,1), (1,2)) 

В второй подход, который я сортирую, используя функциональность Spark, в RDD's documentation вы можете найти, как работают zipWithIndex и .

//case 2. k is big, distinct values don't fit in the Driver. 
val rdd = sc.parallelize(List(1,1,44,4,1,33,44,1,2)) 
val distincts = rdd.distinct.sortBy(x => x).zipWithIndex 
rdd.keyBy(x => x) 
    .join(distincts.keyBy(_._1)) 
    .map{ 
    case (value: Int, (v1: Int, (v2: Int, index: Long))) => (index, value) 
    }.collect() 

//res15: Array[(Long, Int)] = Array((3,33), (2,4), (0,1), (0,1), (0,1), (0,1), (4,44), (4,44), (1,2)) 

Кстати, я использую collect только для целей визуализации в реальном приложении вы не должны использовать его, если вы не уверены, что помещается в памяти водителя.

+0

Большое вам спасибо. он обеспечивает ожидаемые результаты.Поскольку меня больше беспокоит производительность, я хотел понять внутренности о том, как происходит сортировка. В случае, если один ключ с записями 100 тыс., Раздел будет огромным, поэтому интересно, что sortby является единственным вариантом или любым предложением по использованию библиотеки. Я использую то же самое в python, используя Numpy, и он действительно хорошо сортируется. Смотря что-то подобное. – happybayes

+0

Проблема заключается в том, что если k огромен, то как сортировка выполняется искру, путем перемещения нескольких значений между разделами, что действительно неэффективно; но я добавлю его в качестве случая 2. –

+0

спасибо, чтобы помочь мне учиться на этом. очень ценю это. – happybayes

Смежные вопросы