Я пытаюсь найти способ вычисления Медиан для данного Dataframe.Spark Scala: Пользовательская агрегированная функция, которая вычисляет медиану
val df = sc.parallelize(Seq(("a",1.0),("a",2.0),("a",3.0),("b",6.0), ("b", 8.0))).toDF("col1", "col2")
+----+----+
|col1|col2|
+----+----+
| a| 1.0|
| a| 2.0|
| a| 3.0|
| b| 6.0|
| b| 8.0|
+----+----+
Теперь я хочу сделать что-н так:
df.groupBy("col1").agg(calcmedian("col2"))
результат должен выглядеть следующим образом:
+----+------+
|col1|median|
+----+------+
| a| 2.0|
| b| 7.0|
+----+------+`
поэтому calcmedian() должен быть UDAF, но проблема метод «оценки» UDAF принимает только строку, но мне нужна вся таблица для сортировки значений и возврата медианы ...
// Once all entries for a group are exhausted, spark will evaluate to get the final result
def evaluate(buffer: Row) = {...}
Возможно ли это как-то? или есть еще один хороший обход? Я хочу подчеркнуть, что я знаю, как вычислить медиану в наборе данных с «одной группой». Но я не хочу использовать этот алгоритм в цикле «foreach», поскольку это неэффективно!
Спасибо!
редактировать:
that's, что я пытался до сих пор:
object calcMedian extends UserDefinedAggregateFunction {
// Schema you get as an input
def inputSchema = new StructType().add("col2", DoubleType)
// Schema of the row which is used for aggregation
def bufferSchema = new StructType().add("col2", DoubleType)
// Returned type
def dataType = DoubleType
// Self-explaining
def deterministic = true
// initialize - called once for each group
def initialize(buffer: MutableAggregationBuffer) = {
buffer(0) = 0.0
}
// called for each input record of that group
def update(buffer: MutableAggregationBuffer, input: Row) = {
buffer(0) = input.getDouble(0)
}
// if function supports partial aggregates, spark might (as an optimization) comput partial results and combine them together
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1(0) = input.getDouble(0)
}
// Once all entries for a group are exhausted, spark will evaluate to get the final result
def evaluate(buffer: Row) = {
val tile = 50
var median = 0.0
//PROBLEM: buffer is a Row --> I need DataFrame here???
val rdd_sorted = buffer.sortBy(x => x)
val c = rdd_sorted.count()
if (c == 1){
median = rdd_sorted.first()
}else{
val index = rdd_sorted.zipWithIndex().map(_.swap)
val last = c
val n = (tile/ 100d) * (c*1d)
val k = math.floor(n).toLong
val d = n - k
if(k <= 0) {
median = rdd_sorted.first()
}else{
if (k <= c){
median = index.lookup(last - 1).head
}else{
if(k >= c){
median = index.lookup(last - 1).head
}else{
median = index.lookup(k-1).head + d* (index.lookup(k).head - index.lookup(k-1).head)
}
}
}
}
} //end of evaluate
Вы должны 'groupByKey', преобразование агрегированные данные в «Буфер» есть некоторые «UDF's» для достижения этого, а затем вы создаете UDF для вычисления медианы. –
Базовый класс 'UserDefinedAggregateFunction' имеет гораздо больше членов, чем просто' оценка', которые необходимо реализовать. Буфер 'Row', переданный в' оценка', является самым последним шагом. вы пробовали какую-либо реализацию, и если да, то можете ли вы показать свой код до сих пор? – mattinbits
@mattinbits: я добавил код, о котором я думал до сих пор .... – johntechendso