2015-05-28 3 views
2

У меня есть Спарк DataFrame, который выглядит как:Добавить агрегатный Колонка Спарк DataFrame

| id | value | bin | 
|----+-------+-----| 
| 1 | 3.4 | 2 | 
| 2 | 2.6 | 1 | 
| 3 | 1.8 | 1 | 
| 4 | 9.6 | 2 | 

У меня есть функция f, которая принимает массив значений и возвращает число. Я хочу, чтобы добавить столбец к вышеупомянутому кадра данных, где значение для нового столбца в каждой строке значение f для всех value записей, которые имеют ту же bin записи, то есть:

| id | value | bin | f_value  | 
|----+-------+-----+---------------| 
| 1 | 3.4 | 2 | f([3.4, 9.6]) | 
| 2 | 2.6 | 1 | f([2.6, 1.8]) | 
| 3 | 1.8 | 1 | f([2.6, 1.8]) | 
| 4 | 9.6 | 2 | f([3.4, 9.6]) | 

Поскольку мне нужно для суммирования всех value s за bin, я не могу использовать функцию withColumn, чтобы добавить этот новый столбец. Каков наилучший способ сделать это, пока пользовательские функции агрегации не превратятся в Spark?

ответ

2

Ниже код не проверен, а просто идея.

В улье, это можно сделать следующим образом, используя collect_list.

val newDF = sqlContext.sql(
    "select bin, collect_list() from aboveDF group by bin") 

Следующая joinaboveDF и newDF на бункере.

Это вы что искали?

+0

Похоже, что это сработает, но я надеялся не призывать к соединению. Благодаря! – calstad

Смежные вопросы