Решение состоит в перебирать списке результатов значений заселенных из groupByKey(), а затем извлекая topN записывает и добавляет эти значения в новый список. Ниже приведен рабочий пример, вы можете выполнить его на Cloudera VM, поскольку я использовал набор данных образцов Cloudera. Перед выполнением его убедитесь, что у вас есть продукт RDD, сгенерированный из таблицы продуктов, который существует в базе данных mySql - retail_db.
getTopN функция ->
def getTopN(rec: (String, Iterable[String]), topN: Int): Iterable[String] = {
var prodPrices: List[Float] = List()
var topNPrices: List[Float] = List()
var sortedRecs: List[String] = List()
for(i <- rec._2) {
prodPrices = prodPrices:+ i.split(",")(4).toFloat
}
topNPrices = prodPrices.distinct.sortBy(k => -k).take(topN)
sortedRecs = rec._2.toList.sortBy(k => -k.split(",")(4).toFloat)
var x: List[String] = List()
for(i <- sortedRecs) {
if(topNPrices.contains(i.split(",")(4).toFloat))
x = x:+ i
}
return x
}
Главный код ->
##code to generate products RDD
val productsMap = products.
map(rec => (rec.split(",")(1), rec))
productsMap.
groupByKey().
flatMap(x => getTopN(x, 3)).
collect().
foreach(println)
Пожалуйста иллюстрировать ваш вопрос, например, набор данных, попытки кода и ожидаемого результата. – mtoto