2015-10-23 3 views
1

У меня есть матрица в формате CoordinateMatrix в Scala. Матрица разрежен и заходы выглядеть (на coo_matrix.entries.collect),Эффективный способ суммирования строк/столбцов IndexedRowmatrix в Apache Spark

Array[org.apache.spark.mllib.linalg.distributed.MatrixEntry] = Array(
    MatrixEntry(0,0,-1.0), MatrixEntry(0,1,-1.0), MatrixEntry(1,0,-1.0), 
    MatrixEntry(1,1,-1.0), MatrixEntry(1,2,-1.0), MatrixEntry(2,1,-1.0), 
    MatrixEntry(2,2,-1.0), MatrixEntry(0,3,-1.0), MatrixEntry(0,4,-1.0), 
    MatrixEntry(0,5,-1.0), MatrixEntry(3,0,-1.0), MatrixEntry(4,0,-1.0), 
    MatrixEntry(3,3,-1.0), MatrixEntry(3,4,-1.0), MatrixEntry(4,3,-1.0), 
    MatrixEntry(4,4,-1.0)) 

Это лишь небольшой размер выборки. Матрица имеет размер N x N (где N = 1 миллион), хотя большая часть ее разрежена. Каков один из эффективных способов получить суммы строк этой матрицы в Spark Scala? Цель состоит в том, чтобы создать новый RDD, состоящий из сумм строк, т. Е. Размера N, где 1-й элемент представляет собой сумму строки строки1 и т. Д.

Я всегда могу преобразовать эту координатуMatrix в IndexedRowMatrix и запустить цикл for для вычисления строк одновременно, но это не самый эффективный подход.

любая идея очень ценится.

ответ

2

Это будет довольно дорого из-за перетасовки (это часть, которую вы не можете избежать здесь), но вы можете конвертировать записи в PairRDD и сократить ключ:

import org.apache.spark.mllib.linalg.distributed.{MatrixEntry, CoordinateMatrix} 
import org.apache.spark.rdd.RDD 

val mat: CoordinateMatrix = ??? 
val rowSums: RDD[Long, Double)] = mat.entries 
    .map{case MatrixEntry(row, _, value) => (row, value)} 
    .reduceByKey(_ + _) 

В отличие от решения на основе indexedRowMatrix:

import org.apache.spark.mllib.linalg.distributed.IndexedRow 

mat.toIndexedRowMatrix.rows.map{ 
    case IndexedRow(i, values) => (i, values.toArray.sum) 
} 

не требуется groupBy трансформация или промежуточное звено SparseVectors.

Смежные вопросы