3

Я пытаюсь найти способ вычисления Медиан для данного Dataframe.Spark Scala: Пользовательская агрегированная функция, которая вычисляет медиану

val df = sc.parallelize(Seq(("a",1.0),("a",2.0),("a",3.0),("b",6.0), ("b", 8.0))).toDF("col1", "col2") 

+----+----+ 
|col1|col2| 
+----+----+ 
| a| 1.0| 
| a| 2.0| 
| a| 3.0| 
| b| 6.0| 
| b| 8.0| 
+----+----+ 

Теперь я хочу сделать что-н так:
df.groupBy("col1").agg(calcmedian("col2"))

результат должен выглядеть следующим образом:

+----+------+ 
|col1|median| 
+----+------+ 
| a| 2.0| 
| b| 7.0| 
+----+------+` 

поэтому calcmedian() должен быть UDAF, но проблема метод «оценки» UDAF принимает только строку, но мне нужна вся таблица для сортировки значений и возврата медианы ...

// Once all entries for a group are exhausted, spark will evaluate to get the final result 
def evaluate(buffer: Row) = {...} 

Возможно ли это как-то? или есть еще один хороший обход? Я хочу подчеркнуть, что я знаю, как вычислить медиану в наборе данных с «одной группой». Но я не хочу использовать этот алгоритм в цикле «foreach», поскольку это неэффективно!

Спасибо!


редактировать:

that's, что я пытался до сих пор:

object calcMedian extends UserDefinedAggregateFunction { 
    // Schema you get as an input 
    def inputSchema = new StructType().add("col2", DoubleType) 
    // Schema of the row which is used for aggregation 
    def bufferSchema = new StructType().add("col2", DoubleType) 
    // Returned type 
    def dataType = DoubleType 
    // Self-explaining 
    def deterministic = true 
    // initialize - called once for each group 
    def initialize(buffer: MutableAggregationBuffer) = { 
     buffer(0) = 0.0 
    } 

    // called for each input record of that group 
    def update(buffer: MutableAggregationBuffer, input: Row) = { 
     buffer(0) = input.getDouble(0) 
    } 
    // if function supports partial aggregates, spark might (as an optimization) comput partial results and combine them together 
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { 
     buffer1(0) = input.getDouble(0) 
    } 
    // Once all entries for a group are exhausted, spark will evaluate to get the final result 
    def evaluate(buffer: Row) = { 
     val tile = 50 
     var median = 0.0 

     //PROBLEM: buffer is a Row --> I need DataFrame here??? 
     val rdd_sorted = buffer.sortBy(x => x) 
     val c = rdd_sorted.count() 
     if (c == 1){ 
      median = rdd_sorted.first()     
     }else{ 
      val index = rdd_sorted.zipWithIndex().map(_.swap) 
      val last = c 
      val n = (tile/ 100d) * (c*1d) 
      val k = math.floor(n).toLong  
      val d = n - k 
      if(k <= 0) { 
       median = rdd_sorted.first() 
      }else{ 
       if (k <= c){ 
        median = index.lookup(last - 1).head 
       }else{ 
        if(k >= c){ 
         median = index.lookup(last - 1).head 
        }else{ 
         median = index.lookup(k-1).head + d* (index.lookup(k).head - index.lookup(k-1).head) 
        } 
       } 
      } 
     } 
    } //end of evaluate 
+0

Вы должны 'groupByKey', преобразование агрегированные данные в «Буфер» есть некоторые «UDF's» для достижения этого, а затем вы создаете UDF для вычисления медианы. –

+0

Базовый класс 'UserDefinedAggregateFunction' имеет гораздо больше членов, чем просто' оценка', которые необходимо реализовать. Буфер 'Row', переданный в' оценка', является самым последним шагом. вы пробовали какую-либо реализацию, и если да, то можете ли вы показать свой код до сих пор? – mattinbits

+0

@mattinbits: я добавил код, о котором я думал до сих пор .... – johntechendso

ответ

4

попробовать это:

import org.apache.spark.functions._ 

val result = data.groupBy("col1").agg(callUDF("percentile_approx", col("col2"), lit(0.5))) 
Смежные вопросы