2016-12-22 2 views
0

Я придерживаюсь аналогичного варианта использования, как в SPARK DataFrame: select the first row of each group. Единственное отличие состоит в том, что мне нужно выбрать первые три строки каждой группы. Функция agg позволяет мне выбрать верхнее значение с помощью функции max или сначала применить sort, а затем использовать функцию first.SPARK DataFrame: выберите первые 3 строки каждой группы

Есть ли способ достичь этого, используя agg функция после выполнения groupby? Если нет, то каков наилучший способ сделать это?

+1

Пожалуйста иллюстрировать ваш вопрос, например, набор данных, попытки кода и ожидаемого результата. – mtoto

ответ

0

Используйте функции окна с row_number как в связанном вопросе, но заменить:

.where($"rn" === 1) 

с

.where($"rn" <= 3) 
0

Решение состоит в перебирать списке результатов значений заселенных из groupByKey(), а затем извлекая topN записывает и добавляет эти значения в новый список. Ниже приведен рабочий пример, вы можете выполнить его на Cloudera VM, поскольку я использовал набор данных образцов Cloudera. Перед выполнением его убедитесь, что у вас есть продукт RDD, сгенерированный из таблицы продуктов, который существует в базе данных mySql - retail_db.

getTopN функция ->

def getTopN(rec: (String, Iterable[String]), topN: Int): Iterable[String] = { 
      var prodPrices: List[Float] = List() 
      var topNPrices: List[Float] = List() 
      var sortedRecs: List[String] = List() 
      for(i <- rec._2) { 
      prodPrices = prodPrices:+ i.split(",")(4).toFloat 
      } 
      topNPrices = prodPrices.distinct.sortBy(k => -k).take(topN) 
      sortedRecs = rec._2.toList.sortBy(k => -k.split(",")(4).toFloat) 
      var x: List[String] = List() 
      for(i <- sortedRecs) { 
      if(topNPrices.contains(i.split(",")(4).toFloat)) 
       x = x:+ i 
      } 
      return x 
     } 

Главный код ->

##code to generate products RDD 

val productsMap = products. 
    map(rec => (rec.split(",")(1), rec)) 
productsMap. 
    groupByKey(). 
    flatMap(x => getTopN(x, 3)). 
    collect(). 
    foreach(println) 
4
import org.apache.spark.sql.functions.{rowNumber, max, broadcast} 
import org.apache.spark.sql.expressions.Window 

df=Dataframe.... 

val w = Window.partitionBy($"groupColumn").orderBy($"AnyColumn".desc) 

val dfTop = df.withColumn("rn", rowNumber.over(w)).where($"rn" ===> 3).drop("rn") 
dfTop.show