2015-06-02 2 views
0

Я передаю весь свой код на scala, и у меня была функция в pySpark, что я мало знаю, как перевести на scala. Может ли кто-нибудь помочь и дать объяснение? PySpark выглядит следующим образом:Spark AggregateByKey От pySpark до Scala

.aggregateByKey((0.0, 0.0, 0.0), 
         lambda (sum, sum2, count), value: (sum + value, sum2 + value**2, count+1.0), 
         lambda (suma, sum2a, counta), (sumb, sum2b, countb): (suma + sumb, sum2a + sum2b, counta + countb)) 

Edit: То, что я до сих пор:

val dataSusRDD = numFilterRDD.aggregateByKey((0,0,0), (sum, sum2, count) => 

Но то, что я имею трудности с пониманием, как вы пишете это в Скале из группы функции затем назначают значение в группу действий (сумма + значение и т. д.). во вторую совокупность всех функций с соответствующим синтаксисом. Его трудно согласовать мои проблемы в этом сценарии. Тем более, что я не понимаю scala и когда использовать скобки, vs круглые скобки, vs, comma

+0

SO is't действительно сервис перевода кода. Где вы застряли? Какие биты вы не понимаете? –

+0

Добавлено редактирование, объясняющее мои мысли, – theMadKing

+1

В ответе, который я написал здесь, есть пример Scala aggregateByKey: http://stackoverflow.com/a/29953122/21755. Начиная с написания не анонимных функций, возможно, кривая обучения будет немного менее крутой. –

ответ

3

Как @paul предлагает использовать именованные функции, возможно, понимание того, что происходит немного проще.

val initialValue = (0.0,0.0,0.0) 
def seqOp(u: (Double, Double, Double), v: Double) = (u._1 + v, u._2 + v*v, u._3 + 1) 
def combOp(u1: (Double, Double, Double), u2: (Double, Double, Double)) = (u1._1 + u2._1, u1._2 + u2._2, u1._3 + u2._3) 
rdd.aggregateByKey(initialValue)(seqOp, combOp) 
+0

Здравствуйте, я получаю следующую проблему: scala> val dataStatsRDD = numFilterRDD.aggregateByKey (initialValue, seqOp, combOp) : 38: error: too много аргументов для метода aggregateByKey: (zeroValue: U) (seqOp: (U, Double) => U, combOp: (U, U) => U) (неявное доказательство $ 3: scala.reflect.ClassTag [U]) org. apache.spark.rdd.RDD [(Int, U)] val dataStatsRDD = numFilterRDD.aggregateByKey (initialValue, seqOp, combOp) – theMadKing

+0

Плохо, я забыл, что его передали с частичным приложением в Scala. Я обновил ответ. 'rdd.aggregateByKey (initialValue) (seqOp, combOp) ' – Holden

Смежные вопросы