ответ

6
import org.apache.spark.mllib.linalg.{Vectors,Vector,Matrix,SingularValueDecomposition,DenseMatrix,DenseVector} 
import org.apache.spark.mllib.linalg.distributed.RowMatrix 

def computeInverse(X: RowMatrix): DenseMatrix = { 
    val nCoef = X.numCols.toInt 
    val svd = X.computeSVD(nCoef, computeU = true) 
    if (svd.s.size < nCoef) { 
    sys.error(s"RowMatrix.computeInverse called on singular matrix.") 
    } 

    // Create the inv diagonal matrix from S 
    val invS = DenseMatrix.diag(new DenseVector(svd.s.toArray.map(x => math.pow(x,-1)))) 

    // U cannot be a RowMatrix 
    val U = new DenseMatrix(svd.U.numRows().toInt,svd.U.numCols().toInt,svd.U.rows.collect.flatMap(x => x.toArray)) 

    // If you could make V distributed, then this may be better. However its alreadly local...so maybe this is fine. 
    val V = svd.V 
    // inv(X) = V*inv(S)*transpose(U) --- the U is already transposed. 
    (V.multiply(invS)).multiply(U) 
    } 
3

У меня были проблемы с использованием этой функции с опцией

conf.set("spark.sql.shuffle.partitions", "12") 

Строки в RowMatrix получил перемешиваются.

Вот обновление, которое работало для меня

import org.apache.spark.mllib.linalg.{DenseMatrix,DenseVector} 
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix 

def computeInverse(X: IndexedRowMatrix) 
: DenseMatrix = 
{ 
    val nCoef = X.numCols.toInt 
    val svd = X.computeSVD(nCoef, computeU = true) 
    if (svd.s.size < nCoef) { 
    sys.error(s"IndexedRowMatrix.computeInverse called on singular matrix.") 
    } 

    // Create the inv diagonal matrix from S 
    val invS = DenseMatrix.diag(new DenseVector(svd.s.toArray.map(x => math.pow(x, -1)))) 

    // U cannot be a RowMatrix 
    val U = svd.U.toBlockMatrix().toLocalMatrix().multiply(DenseMatrix.eye(svd.U.numRows().toInt)).transpose 

    val V = svd.V 
    (V.multiply(invS)).multiply(U) 
} 
0

Матрицы U возвращаемой X.computeSVD имеет размеры MXk где м является количеством строк исходного (распределенным) RowMatrix X. Можно был бы ожидать м будет большой (возможно, больше k), поэтому нецелесообразно собирать его в драйвере, если мы хотим, чтобы наш код масштабировался до действительно больших значений m.

Я бы сказал, что оба из нижеприведенных решений страдают от этого недостатка. Ответ, заданный @Alexander Kharlamov, вызывает val U = svd.U.toBlockMatrix().toLocalMatrix(), который собирает матрицу в драйвере. То же самое происходит с ответом @Climbs_lika_Spyder (кстати, ваши ник-камни !!), который вызывает svd.U.rows.collect.flatMap(x => x.toArray). Я предпочел бы полагаться на распределенное умножение матрицы, такое как код Scala, отправленный here.

+0

Я не вижу никаких обратных вычислений по добавленной ссылке. –

+0

@Climbs_lika_Spyder Ссылка о распределении распределенной матрицы для замены умножения локальной матрицы '(V.multiply (invS)). Умножить (U)' в последней строке вашего решения, чтобы вам не нужно было собирать 'U', в драйвере. Я думаю, что 'V' и' invS' недостаточно велики, чтобы создавать проблемы. – Pablo

Смежные вопросы