2016-04-08 3 views
0

Во-первых, я новичок в Spark и Python. Я пытаюсь преобразовать RDD (упругий распределенный набор данных) в другой.PySpark IPython - уменьшить RDD в новый RDD с помощью другого ключа

вход РДД является:

(u'Task1', (u'James', 10)), 
(u'Task1', (u'James', 15)), 
(u'Task1', (u'James', 18)), 
(u'Task1', (u'James', 11)), 
(u'Task1', (u'Oliver', 10)), 
(u'Task1', (u'Oliver', 15)), 
(u'Task2', (u'Oliver', 18)), 
(u'Task2', (u'Oliver', 11)), 

Теперь я пытаюсь создать функцию, которая выводит сумму часов для каждого человека, независимо от задачи:

def extract_time_tracking(time_bookings): 
    ??? 
    return (person, total_hours) 
time_trackings_sum = input_RDD.???(extract_time_tracking) 

Выход должно быть:

(u'James', 54), # has been working on Task1 only 
(u'Oliver', 54), # has been working on Task1 and Task2 

Я использую PySpark IPython. Я думал о combByKey или reduceByKey, но они всегда используют один и тот же ключ. Но в моем случае результирующий ключ отличается от входного ключа?!?!?

Спасибо за любую помощь.

ответ

1

Используйте функцию map для преобразования объекта таким образом, чтобы первым элементом кортежа был необходимый вам ключ. Поскольку вы не заботитесь о задаче, вы можете полностью отказаться от нее.

input_RDD.map(lambda x: x[1]).reduceByKey(lambda x,y: x+y) 

Если позже вы хотите просто перенести ключ, вы могли бы сделать более сложную карту:

input_RDD.map(lambda x: (x[1][0],(x[0],x[1][1])) 
+0

Не могли бы вы объяснить первый лямбда в функции карты немного больше? почему х [1]? кортеж будет задачей, Matthias

+1

@ Matthias Предположим, что x = (u'Task1 ', (u'James', 10)), x [0] будет u'Task1 ', а x [1] будет (u'James ', 10), используя [нормальный набор кортежей] (https://docs.python.org/2/tutorial/datastructures.html#tuples-and-sequences). Поскольку мы рассчитываем рассчитать часы по имени, это две части информации, которые нам нужны. (Дополнительные комментарии приходят и в других частях этой проблемы.) –

+0

Способ думать о «карте» - это преобразовывать RDD, используя произвольную функцию, которую вы пишете. Если вы начинаете с RDD, объекты которого имеют тип X, и вы хотите иметь тип Y, вы пишете карту, которая превратит один X в один Y, а затем вы будете применять его по строкам параллельно. Эта проблема нуждается только в простой трансформации - отбросить задачу. Предположим, вместо этого мы хотели бы суммировать задачи, и у нас есть эти имена на пути. Затем мы делаем следующее: 'input_RDD.map (lambda x: (x [0], x [1] [1]). ReduceByKey (lambda x, y: x + y)' –

0
def extract_time_tracking(time_bookings): 
val splits = rec.split(",") 
val person = splits(1).replaceAll(" \\(u'", "").replaceAll("'", "") 
val total_hours = splits(2).replaceAll("\\)", "").trim().toInt 
return (person, total_hours) 


input_RDD.map(extract_time_tracking).reduceByKey 

Я использую scala, поэтому, пожалуйста, проверьте синтаксис.

Смежные вопросы