2015-08-10 4 views
0

Просто интересно, как я могу сделать следующее:Scala/Spark - Агрегирование РДД

Пусть у меня есть RDD, содержащий (имя пользователя, возраст, movieBought) для многих имен пользователей и некоторых линий может иметь такое же имя и возраст, но другой фильмBought.

Как удалить дублированные строки и преобразовать их в (имя пользователя, возраст, movieBought1, movieBought2 ...)?

С наилучшими пожеланиями

ответ

1
val grouped = rdd.groupBy(x => (x._1, x._2)).map(x => (x._1._1, x._1._2, x._2.map(_._3))) 

val results = grouped.collect.toList 

UPDATE (если каждый кортеж имеет также ряд фильмов пункта):

val grouped = rdd.groupBy(x => (x._1, x._2)).map(x => (x._1._1, x._1._2, x._2.map(m => (m._3, m._4)))) 

val results = grouped.collect.toList 
+0

Если бы я был дополнительный элемент в списке, таких как numberofmoviesbought1, то есть: (USERNAME, AGE, MOVIEBOUGHT1, NUMBERBOUGHT), как бы этот код может быть изменен, чтобы соответствовать этому параметру? Я попробовал возиться и не смог получить результат –

+0

@KevinZ, обновил ответ – ka4eli

+0

Так что бы мне дали: (имя пользователя, возраст, (фильм1, номер), (фильм2, номер) и т. Д.)? –

0

Я собирался предложить собрать и перечислить, но ka4eli бить меня к нему ,

Я думаю, вы также можете использовать groupBy/groupByKey, а затем уменьшить/уменьшитьByKey. Недостатком этого параметра является то, что результат (movie1, movie2, movie3 ..) объединяется в 1 строку (вместо структуры List, что затрудняет доступ к ней).

val group = rdd.map(x=>((x.name,x.age),x.movie))).groupBy(_._1) 
val result = group.map(x=>(x._1._1,x._1._2,x._2.map(y=>y._2).reduce(_+","+_) 
+0

«Недостатком этого из-за того, что результат (movie1, movie2, movie3 ..) объединяются в 1 строку «Только потому, что вы делаете string-concatente в своем сокращении. Вместо этого используйте aggregateByKey. –

Смежные вопросы