2015-01-26 3 views
-1

У меня есть массив Array [(Int, String)], который состоит из пар ключ-значение для всего набора данных, где ключ - это номер столбца, а значение - значение столбца.Пользовательская функция внутри reduceByKey in spark

Итак, я хочу использовать reduceByKey для выполнения определенных операций, таких как max, min, mean, median, quartile вычисления по ключу.

Как добиться этого, используя reduceByKey, поскольку groupByKey проливает много данных на диск. Как передать пользовательскую функцию внутри reduceByKey.

Или есть лучший способ сделать это.

Спасибо!

+1

Как вы планируете вычислять max, min, mean и т. Д. По значениям String? – pzecevic

+1

reduceByKey уже выполняет (пользовательскую) функцию. Итак, каков ваш реальный вопрос? –

+0

@Paul ... да, я могу использовать пользовательскую функцию внутри reduceByKey, чтобы найти min. Но я хочу рассчитать значение min, max и mean внутри одной пользовательской функции. Является ли это возможным. Защиту MyFunc (х: распашные у: Double) = { , если (х> у) х еще у } –

ответ

7

Вы можете использовать combByKey для отслеживания сумм, count, min, max значений, все в одном преобразовании. Для этого вам нужно 3 функции:

  • создать функцию объединителя - это будет инициализировать «комбинированное значение», состоящее из мин, макс и т.д. функции
  • значения Merge - что добавит еще одно значение для «общей стоимостью»
  • слияние сумматоров - это будет объединить два «объединенных значений» вместе

второго подход заключается в том, чтобы использовать накапливаемый объект или несколько аккумуляторов.

Пожалуйста, проверьте документацию. При необходимости я могу привести несколько примеров.

Update:

Ниже приведен пример расчета среднего по ключу. Вы можете развернуть его, чтобы рассчитать min и max, тоже:

def createComb = (v:Double) => (1, v) 

def mergeVal:((Int,Double),Double)=>(Int,Double) =      
     {case((c,s),v) => (c+1, s+v)} 

def mergeComb:((Int,Double),(Int,Double))=>(Int,Double) = 
     {case((c1,s1),(c2,s2)) => (c1+c2, s1+s2)} 

val avgrdd = rdd.combineByKey(createComb, mergeVal, mergeComb, 
     new org.apache.spark.HashPartitioner(rdd.partitions.size)) 
     .mapValues({case(x,y)=>y/x}) 
+0

Спасибо pzecevic .... Можете ли вы предоставить некоторые пример/ссылка для него. –

+0

Я понял, как это работает, но мне нужно написать три разные функции для каждой операции, такие как min, max и т. Д. Бит путают, как это можно сделать. –