2016-11-14 4 views
0

мне удалось предварительно процесс моих данных в pyspark, чтобы получить что-то вроде этогоФильтрация данных в РДУ

[(u'key1', u'1'), (u'key2', u'1'), (u'key1', u'2'), (u'key3', u'2'), (u'key4', u'1'), (u'key1', u'4'), (u'key5', u'1'), (u'key6', u'2'), (u'key7', u'4'), (u'key8', u'5'), (u'key9', u'6'), (u'key10', u'7')] 

Теперь мне нужно фильтровать на основе этих условий:

1) значения фильтра, связанное с по крайней мере 2 ключа.

выход - только те, (K, V) пар, который имеет '1', '2', '4', как значения должны присутствовать, так как они связаны с более чем 2 ключами

[(u'key1', u'1'), (u'key2', u'1'), (u'key1', u'2'), (u'key3', u'2'), (u'key4', u'1'), (u'key1', u'4'), (u'key5', u'1'), (u'key6', u'2'), (u'key2', u'4')] 

2) ключи фильтра, которые связаны с по крайней мере 2-х значений

выход - только те, (K, V) пар, который имеет key1, key2, как ключи должны быть там, так как они связаны с по крайней мере 2-х значений

[(u'key1', u'1'), (u'key2', u'1'), (u'key1', u'2'), (u'key1', u'4'), (u'key2', u'4')] 

Любой предполагают ионы будут очень полезны.

Обновление: Я использовал GroupBy и фильтр для группы для ключей с Mutiple значений

[(u'key1', [u'1', u'2', u'4']), (u'key2',[u'1', u'4'])] 

Теперь, как я разделить этот (ключ, список (значения)) для индивидуального (к, v) пара применять дальнейшая трансформация?

+0

Вы можете сделать все за один проход - уменьшитьByKey, фильтровать элементы, которые имеют более 2 значений, а затем собирать или обрабатывать все, что есть. С какими особенностями у вас возникают проблемы? – khachik

+0

@khachik Сокращение по ключу будет агрегировать на основе ключа правильно? поэтому он даст что-то вроде (u'key1 ', u'1,2,3'), если я присоединяюсь к значениям в reduceByKey, разделенном ','. Мне не нужны мои данные для агрегирования. пожалуйста, поправьте меня, если я ошибаюсь. – Magic

+0

Если я использую reduceByKey, а затем, когда он агрегирован, я могу использовать фильтр для фильтрации только тех, у которых более двух значений. Теперь как выполнить второй цикл фильтрации для фильтрации ключей, которые связаны с более чем двумя значениями? – Magic

ответ

1
my_rdd = sc.parallelize([(u'key1', u'1'), (u'key2', u'1'), (u'key1', u'2'), (u'key2', u'3'), (u'key4', u'1'), (u'key1', u'4'), (u'key4', u'1'), (u'key6', u'2'), (u'key7', u'4'), (u'key8', u'5'), (u'key9', u'6'), (u'key10', u'7')]) 

#filter keys which are associated to atleast 2 values 

filter2_rdd = my_rdd.groupByKey() \ 
        .mapValues(lambda x: list(x)) \ 
        .filter(lambda x: len(x[1])>=2) \ 
        .flatMap(lambda x: [(x[0],item) for item in x[1]]) 

#filter values associated to atleast 2 keys. 
filte1_rdd = filter2_rdd.map(lambda x: (x[1],x[0])) \ 
         .groupByKey().mapValues(lambda x: list(x))\ 
         .filter(lambda x: len(x[1])>=2)\ 
         .flatMap(lambda x: [(item,x[0]) for item in x[1]]) 

Это будет работать !!

+0

Сделал почти все, кроме части flatMap, где вы разбиваете список как индивидуальную пару (k, v). Помощь FlatMap помогла, спасибо! – Magic

0

Сократить ключ, фильтр и присоединиться:

>>> rdd.mapValues(lambda _: 1) \ # Add key of value 1 
...  .reduceByKey(lambda x, y: x + y) \ # Count keys 
...  .filter(lambda x: x[1] >= 2) \ # Keep only if number is >= 2 
...  .join(rdd) # join with original (serves as filter) 
...  .mapValues(lambda x: x[0]) # reshape 
+0

не могли бы вы объяснить, что вы пытаетесь сделать? – Magic

Смежные вопросы