2016-04-16 3 views
1

У меня есть rdd с несколькими значениями (списком) против одного ключа, я хочу отфильтровать мусор из каждого значения в ключе.карта с несколькими значениями одного ключевого pyspark

рдда имеет эти данные

((key1, [('',val1),('', val2),..]),(key2,[...) 

Я хочу, чтобы отобразить его на что-то вроде этого

((key1,[val1, val2,...]), key2[...) 

Я знаю, здесь требуется функция карты, но я не использовал карту для нескольких значений против ключ.

Это мое усилие для этого.

def mapper(x): 
    values = [] 
    for a in x[1]: 
     values.append(a[1]) 
    return(x[0], ap) 
listRdd.map(mapper).collect() 

, но я получаю несколько ошибок

ответ

1

Основная идея заключается в том, чтобы рассмотреть каждую запись в РДУ как единый процесс сбора его как так. Смысл, если мы рассмотрим следующую запись

entry = ("key1", [('',"val1"),('',"val2")]) 

обрабатывать эту коллекцию в ожидаемом выходе, мы должны понять структуру коллекции

entry[0] 
# 'key1' 

entry[1] 
# [('', 'val1'), ('', 'val2')] 

теперь давайте работать на этой второй части:

map(lambda x : x[1],entry[1]) 
# ['val1', 'val2'] 

Теперь мы можем определить функцию, которая принимает запись как входной сигнал, а результирующий вывод будет (ключ, [значения ...]) кортежа. Назовем это mapper. Мы можем применить отображение к каждой записи в rdd.

Ввод кода вместе:

def mapper(entry): 
    return (entry[0],map(lambda x : x[1],entry[1])) 

data = [("key1", [('',"val1"),('',"val2")]),("key2",[('',"val3"),('',"val2"),('',"val4")])] 

rdd = sc.parallelize(data) 

rdd2 = rdd.map(lambda x : mapper(x)) 

rdd2.collect() 
# [('key1', ['val1', 'val2']), ('key2', ['val3', 'val2', 'val4'])] 
+1

Я сделал то же самое, но там была ошибка синтаксиса в функции отображения, спасибо за объяснение, хотя –

+0

Вас! – eliasah