2016-12-07 3 views
2

У меня есть файл данных PySpark с одним столбцом в виде одного горячего кодированного вектора. Я хочу агрегировать разные горячие кодированные векторы путем добавления векторов после группыПользовательская агрегация на фреймах данных pyspark

, например. df[userid,action] Row1: ["1234","[1,0,0]] Row2: ["1234", [0 1 0]]

Я хочу, чтобы выход как строка: ["1234", [ 1 1 0]] поэтому вектор представляет собой сумму всех векторов, сгруппированных по userid.

Как я могу это достичь? Операция суммирования сумм PySpark не поддерживает добавление вектора.

ответ

4

У вас есть несколько вариантов:

  1. Создание определенного пользователя агрегатная функции. Проблема в том, что вам нужно будет write the user defined aggregate function in scala и wrap it to use in python.
  2. Вы можете использовать функцию collect_list, чтобы собрать все значения в список, а затем написать UDF для их объединения.
  3. Вы можете перейти на RDD и использовать агрегат или агрегат по ключу.

Оба варианта 2 & 3 будет относительно неэффективным (стоимость как процессора, так и памяти).

+0

Я понимаю, почему (2) неэффективен, поскольку он перемещает все данные для сбора элементов в реальных списках. Но почему (3) неэффективно? –

+1

@ThomasB. 3 неэффективен, потому что вы потеряете все оптимизаторы данных (например, оптимизация каталогов, целая система и т. Д.), Так как ваш ввод данных является фреймворком данных, он также должен будет преобразовать данные из представления данных данных в представление RDD. Наконец, в python все операции RDD в основном сериализуют данные на python, имеют на них python и возвращают результаты, которые относительно медленны. –

Смежные вопросы