2016-04-26 4 views
1

У меня есть RDD, который очень длинный (несколько миллиардов строк) и достаточно широкий (несколько сотен столбцов). Я хочу создавать наборы уникальных значений в каждом столбце (эти множества не нужно распараллелить, так как они будут содержать не более 500 уникальных значений для каждого столбца).Найти различные значения для каждого столбца в RDD в PySpark

Вот то, что я до сих пор:

data = sc.parallelize([["a", "one", "x"], ["b", "one", "y"], ["a", "two", "x"], ["c", "two", "x"]]) 
num_columns = len(data.first()) 
empty_sets = [set() for index in xrange(num_columns)] 
d2 = data.aggregate((empty_sets), (lambda a, b: a.add(b)), (lambda x, y: x.union(y))) 

Что я делаю здесь пытается initate список пустых наборов, по одному для каждого столбца в моем РДУ. Для первой части агрегации я хочу итерации по строкам через data, добавляя значение в столбце n к n-му набору в моем списке наборов. Если значение уже существует, оно ничего не делает. Затем он выполняет union наборов после этого, поэтому только разные значения возвращаются по всем разделам.

Когда я пытаюсь запустить этот код, я получаю следующее сообщение об ошибке:

AttributeError: 'list' object has no attribute 'add'

Я считаю, что проблема в том, что я точно не давая понять, что я итерация по списку наборов (empty_sets) и что я повторяю столбцы каждой строки в data. Я считаю, что в (lambda a, b: a.add(b))a - empty_sets и b - data.first() (вся строка, ни одно значение). Это, очевидно, не работает, и это не мое намеренное объединение.

Как я могу перебирать через свой список наборов и через каждую строку моего блока данных добавлять каждое значение к соответствующему заданному объекту?

Нужный результат будет выглядеть следующим образом:

[set(['a', 'b', 'c']), set(['one', 'two']), set(['x', 'y'])]


PS Я смотрел на этот пример here, что чрезвычайно похож на моего случая использования (это где у меня появилась идея использовать aggregate в первую очередь). Тем не менее, я считаю, что код очень сложно преобразовать в PySpark, и я очень не совсем понимаю, что делают коды и zip.

ответ

1

Есть две проблемы. Во-первых, ваши функции объединителя предполагают, что каждая строка представляет собой единый набор, но вы работаете в списке наборов. Два, add ничего не возвращают (попробуйте a = set(); b = a.add('1'); print b), поэтому ваша первая функция сумматора возвращает список None s. Чтобы исправить это, сделайте свою первую функцию объединителя не анонимной, и оба они перейдут по спискам множеств:

def set_plus_row(sets, row): 
    for i in range(len(sets)): 
     sets[i].add(row[i]) 
    return sets 


unique_values_per_column = data.aggregate(
    empty_sets, 
    set_plus_row, # can't be lambda b/c add doesn't return anything 
    lambda x, y: [a.union(b) for a, b in zip(x, y)] 
) 

Я не уверен, что zip делает в Scala, но в Python, он принимает два списка и помещает каждый соответствующий элемент вместе в кортежи (попробуйте x = [1, 2, 3]; y = ['a', 'b', 'c']; print zip(x, y);), так что вы можете цикл в течение двух списков одновременно.

Смежные вопросы