У меня есть RDD, который очень длинный (несколько миллиардов строк) и достаточно широкий (несколько сотен столбцов). Я хочу создавать наборы уникальных значений в каждом столбце (эти множества не нужно распараллелить, так как они будут содержать не более 500 уникальных значений для каждого столбца).Найти различные значения для каждого столбца в RDD в PySpark
Вот то, что я до сих пор:
data = sc.parallelize([["a", "one", "x"], ["b", "one", "y"], ["a", "two", "x"], ["c", "two", "x"]])
num_columns = len(data.first())
empty_sets = [set() for index in xrange(num_columns)]
d2 = data.aggregate((empty_sets), (lambda a, b: a.add(b)), (lambda x, y: x.union(y)))
Что я делаю здесь пытается initate список пустых наборов, по одному для каждого столбца в моем РДУ. Для первой части агрегации я хочу итерации по строкам через data
, добавляя значение в столбце n
к n
-му набору в моем списке наборов. Если значение уже существует, оно ничего не делает. Затем он выполняет union
наборов после этого, поэтому только разные значения возвращаются по всем разделам.
Когда я пытаюсь запустить этот код, я получаю следующее сообщение об ошибке:
AttributeError: 'list' object has no attribute 'add'
Я считаю, что проблема в том, что я точно не давая понять, что я итерация по списку наборов (empty_sets
) и что я повторяю столбцы каждой строки в data
. Я считаю, что в (lambda a, b: a.add(b))
a
- empty_sets
и b
- data.first()
(вся строка, ни одно значение). Это, очевидно, не работает, и это не мое намеренное объединение.
Как я могу перебирать через свой список наборов и через каждую строку моего блока данных добавлять каждое значение к соответствующему заданному объекту?
Нужный результат будет выглядеть следующим образом:
[set(['a', 'b', 'c']), set(['one', 'two']), set(['x', 'y'])]
PS Я смотрел на этот пример here, что чрезвычайно похож на моего случая использования (это где у меня появилась идея использовать aggregate
в первую очередь). Тем не менее, я считаю, что код очень сложно преобразовать в PySpark, и я очень не совсем понимаю, что делают коды и zip
.