2017-01-07 3 views
0

У меня возникли проблемы со следующими упражнениями по сокращению карты в Spark с помощью python. Функция map возвращает следующий RDD.Смутно о поведении функции Уменьшить на карте уменьшить

РДД = [(3, ({0: [2], 1: [5], 3: [1]}, множество ([2]))),
(3, ({0 : [4], 1: [3], 3: [5]}, установите ([1])),
(1, ({0: [4, 5], 1: [2]}, установите ([3)))]

Я написал функцию редуктора, который должен сделать некоторые вычисления на кортежи с тем же ключом (в предыдущем примере первые два имеют ключ = 3, и последний ключ 1)

def Reducer(k, v): 
cluster = k[0] 
rows = [k[1], v[1]] 
g_p = {} 
I_p = set() 
for g, I in rows: 
    g_p = CombineStatistics(g_p, g) 
    I_p = I_p.union(I) 
return (cluster, [g_p, I_p]) 

проблема в том, что я ожидаю, что k и v всегда будут иметь один и тот же ключ (т. k[0]==v[0]). Но это не относится к этому коду.

Я работаю над платформой Databricks, и, честно говоря, это кошмар, не способный отлаживать, иногда даже не «печатать». Это очень сложно работать в этой среде.

ответ

0

Если вы хотите уменьшить RDD на основе того же ключа, вы должны использовать reduceByKey вместо преобразования reduce. После замены имени функции вы должны учитывать, что параметры функции reduceByKey являются значениями (k[1] и v[1] в вашем случае), а не целыми рядами rdd.

Печать внутри функции редуктора не будет работать в распределенной среде на databricks, поскольку эта функция оценивается исполнителями (внутри облака амазонки). Если вы начинаете искру в локальном режиме, все распечатки python будут работать (но я не уверен, что локальный режим доступен на databricks).

+0

Так что, несмотря на то, что в функции mapper я возвращаю (ключ, значение), только параметр значения будет передан редуктору? –

+0

Справа. И в результате вы возвращаете только часть «значений» RDD. – Mariusz

Смежные вопросы