2016-02-12 5 views
0

У меня есть 2RDD, и я хочу умножить элемент между этими 2 rdd.Multiply SparseVectors element-wise

Допустим, что я имею следующий RDD (пример):

a = ((1,[0.28,1,0.55]),(2,[0.28,1,0.55]),(3,[0.28,1,0.55])) 
aRDD = sc.parallelize(a) 
b = ((1,[0.28,0,0]),(2,[0,0,0]),(3,[0,1,0])) 
bRDD = sc.parallelize(b) 

Можно видеть, что b разрежен, и я хочу, чтобы избежать умножить нулевое значение с другим значением. Я делаю следующее:

from pyspark.mllib.linalg import Vectors 
def create_sparce_matrix(a_list): 
    length = len(a_list) 
    index = [i for i ,e in enumerate(a_list) if e !=0] 
    value = [e for i ,e in enumerate(a_list) if e !=0] 
    sv1 = Vectors.sparse(length,index,value) 
    return sv1 


brdd = b.map(lambda (ids,a_list):(ids,create_sparce_matrix(a_list))) 

И умножение:

combinedRDD = ardd + brdd 
result = combinedRDD.reduceByKey(lambda a,b:[c*d for c,d in zip(a,b)]) 

кажется, что я не могу Умножая sparce со списком в РДУ. Есть ли способ сделать это? Или еще один эффективный способ умножить элемент, когда один из двух RDD имеет много нулевых значений?

ответ

1

Один из способов справиться с этим, чтобы преобразовать aRDD в RDD[DenseVector]:

from pyspark.mllib.linalg import SparseVector, DenseVector, Vectors 

aRDD = sc.parallelize(a).mapValues(DenseVector) 
bRDD = sc.parallelize(b).mapValues(create_sparce_matrix) 

и использовать основные операции Numpy:

def mul(x, y): 
    assert isinstance(x, DenseVector) 
    assert isinstance(y, SparseVector) 
    assert x.size == y.size 
    return SparseVector(y.size, y.indices, x[y.indices] * y.values) 

aRDD.join(bRDD).mapValues(lambda xy: mul(*xy))