2015-11-16 5 views
-1

Я хочу ранжировать идентификатор пользователя на основе одного поля. Для того же значения поля, ранг должен быть таким же. Эти данные находятся в таблице Hive.Вычисление ранга строки

например.

user value 
a  5 
b  10 
c  5 
d  6 

Rank 
a - 1 
c - 1 
d - 3 
b - 4 

Как я могу это сделать?

ответ

16

Можно использовать rank функцию окна либо с DataFrame API:

import org.apache.spark.sql.functions.rank 
import org.apache.spark.sql.expressions.Window 

val w = Window.orderBy($"value") 

val df = sc.parallelize(Seq(
    ("a", 5), ("b", 10), ("c", 5), ("d", 6) 
)).toDF("user", "value") 

df.select($"user", rank.over(w).alias("rank")).show 

// +----+----+ 
// |user|rank| 
// +----+----+ 
// | a| 1| 
// | c| 1| 
// | d| 3| 
// | b| 4| 
// +----+----+ 

или сырые SQL:

df.registerTempTable("df") 
sqlContext.sql("SELECT user, RANK() OVER (ORDER BY value) AS rank FROM df").show 

// +----+----+ 
// |user|rank| 
// +----+----+ 
// | a| 1| 
// | c| 1| 
// | d| 3| 
// | b| 4| 
// +----+----+ 

но это крайне неэффективно.

Вы также можете попробовать использовать RDD API, но это не совсем просто. Первый позволяет конвертировать DataFrame в РДУ:

import org.apache.spark.sql.Row 
import org.apache.spark.rdd.RDD 
import org.apache.spark.RangePartitioner 

val rdd: RDD[(Int, String)] = df.select($"value", $"user") 
    .map{ case Row(value: Int, user: String) => (value, user) } 

val partitioner = new RangePartitioner(rdd.partitions.size, rdd) 
val sorted = rdd.repartitionAndSortWithinPartitions(partitioner) 

Далее мы должны вычислить ранги за раздел:

def rank(iter: Iterator[(Int,String)]) = { 
    val zero = List((-1L, Integer.MIN_VALUE, "", 1L)) 

    def f(acc: List[(Long,Int,String,Long)], x: (Int, String)) = 
    (acc.head, x) match { 
     case (
     (prevRank: Long, prevValue: Int, _, offset: Long), 
     (currValue: Int, label: String)) => { 
     val newRank = if (prevValue == currValue) prevRank else prevRank + offset 
     val newOffset = if (prevValue == currValue) offset + 1L else 1L 
     (newRank, currValue, label, newOffset) :: acc 
    } 
    } 

    iter.foldLeft(zero)(f).reverse.drop(1).map{case (rank, _, label, _) => 
    (rank, label)}.toIterator 
} 


val partRanks = sorted.mapPartitions(rank) 

смещения для каждого раздела

def getOffsets(sorted: RDD[(Int, String)]) = sorted 
    .mapPartitionsWithIndex((i: Int, iter: Iterator[(Int, String)]) => 
    Iterator((i, iter.size))) 
    .collect 
    .foldLeft(List((-1, 0)))((acc: List[(Int, Int)], x: (Int, Int)) => 
    (x._1, x._2 + acc.head._2) :: acc) 
    .toMap 

val offsets = sc.broadcast(getOffsets(sorted)) 

и конечных рядов:

def adjust(i: Int, iter: Iterator[(Long, String)]) = 
    iter.map{case (rank, label) => (rank + offsets.value(i - 1).toLong, label)} 

val ranks = partRanks 
    .mapPartitionsWithIndex(adjust) 
    .map{case (i, label) => (1 + i , label)} 
+0

Я думаю, что это отличный ответ, однако, coul d мы получаем более подробную информацию о том, почему API-интерфейс dataframe здесь неэффективен? – BlueSky

+0

@BlueSky Поскольку определение 'Window' без' partitionBy' перетасовывает все в один раздел. С сегодняшним API 'Daset' вы можете переписать версию« RDD ». – zero323

Смежные вопросы