2016-02-10 2 views
1

Если предположить, что у меня есть следующий RDD:Добавить увеличивающиеся переменную в РДУ

test1 = (('trial1',[1,2]),('trial2',[3,4])) 
test1RDD = sc.parallelize(test1) 

Как я могу создать следующий RDD:

((1,'trial1',[1,2]),(2,'trial2',[3,4])) 

Я попытался с аккумуляторами, но он не работает, как аккумуляторы не могут быть доступный в задачах:

def increm(keyvalue): 
    global acc 
    acc +=1 
    return (acc.value,keyvalue[0],keyvalue[1]) 


acc = sc.accumulator(0) 
test1RDD.map(lambda x: increm(x)).collect() 

Любая идея, как это можно сделать?

ответ

6

Вы можете использовать zipWithIndex

zipWithIndex()

Молнии этот RDD с его индексами элементов.

Заказ сначала основан на индексе раздела, а затем заказ элементов в каждом разделе. Таким образом, первый элемент в первом разделе получает индекс 0, а последний элемент последнего раздела получает наибольший индекс.

Этот метод должен запускать искровое задание, если этот RDD содержит больше , чем один раздел.

>>> sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect() 
[('a', 0), ('b', 1), ('c', 2), ('d', 3)] 

и использовать map для преобразования RDD иметь индекс перед новым РДД

Это непроверенное, как я не имею никакого окружения:

test1 = (('trial1',[1,2]),('trial2',[3,4])) 
test1RDD = sc.parallelize(test1) 
test1RDD.zipWithIndex().map(lambda x : (x[1],x[0])) 
Смежные вопросы