2016-02-17 3 views
0

Не могли бы вы помочь мне достичь следующего:Pyspark - Нахождение пересечения ключевых пар значений

Рассмотрим I/P, который представляет собой список пар ключ-значение, где ключ представляет собой (кортеж) и значение [ список]. Если в i/p есть два одинаковых ключа, тогда должно быть значение. пересекались. Если вы не найдете другую пару ключей, тогда этот ключ следует игнорировать в o/p.

Пример:

>>> data = [((1, 2), [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]), ((1, 2), [1, 3, 4, 5, 6, 7, 10, 11])] 
>>> rdd = sc.parallelize(data) 
>>> rdd.reduceByKey(lambda x,y : list(set(x).intersection(y))).collect() 

о/р: [((1, 2), [3, 4, 5, 6, 7, 10, 11])]

Поскольку существуют два вхождения той же клавише, следовательно, значения пересекались.

>>> data = [((1, 2), [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]), ((1, 3), [1, 3, 4, 5, 6, 7, 10, 11])] 
>>> rdd = sc.parallelize(data) 
>>> rdd.reduceByKey(lambda x,y : list(set(x).intersection(y))).collect() 

о/р, что я получаю: [((1, 2), [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]), ((1, 3), [1, 3, 4, 5, 6, 7, 10, 11])]

желаемому результату: о/р не должно быть ничего. Поскольку нет пары ключей.

ответ

1

Я реализовал логику, как показано ниже

datardd.map(lambda x:(x[0],(x[1],1))).reduceByKey(lambda x,y:(set(x[0]).intersection(y[0]),x[1]+y[1])).filter(lambda x:x[1][1]>1).map(lambda x:(x[0],list(x[1][0]))).collect() 
  1. Карта переменную счетчика для данного ключа вместе с существующим значением списка в виде

    [(Key, (значение, счетчик))]:

    ex: [((1, 2), ([2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14], 1))]

  2. Использование reduceByKey для реализации операции пересечения и приращения счетчика

  3. фильтра и публиковать значения, для которых значения счетчика> 1

Смежные вопросы