2015-07-06 4 views
1

Я выполнил анализ основных компонентов на матрице, которую я ранее загружал sc.textFile. Вывод - это org.apache.spark.mllib.linalg.Matrix I, а затем преобразовал его в RDD [Vector [Double]].Запись вывода основных компонентов Анализ в текстовый файл

с:

import java.io.PrintWriter 

я сделал:

val pw = new PrintWriter("Matrix.csv") 
    rows3.collect().foreach(line => pw.println(line)) 
    pw.flush 

Выход CSV является перспективным. единственная проблема заключается в том, что каждая строка является DenseVector (некоторые значения). Как разбить каждую строку на соответствующие коэффициенты?

Большое спасибо

+0

Посмотрите [здесь] (http://stackoverflow.com/questions/29946190/how-to-change-rowmatrix-into-array- в-искрового или-экспорт-он-как-CSV/29946713 # 29946713)! – eliasah

ответ

1

Вы можете использовать результаты computePrincipalComponents и breeze.linalg.csvwrite:

import java.io.File 
import breeze.linalg.{DenseMatrix => BDM, csvwrite} 

val mat: RowMatrix = ... 
val pca = mat.computePrincipalComponents(...) 

csvwrite(
    new File("Matrix.csv"), 
    new BDM[Double](mat.numRows, mat.numCols, mat.toArray)) 
+0

, за исключением того, что при проверке выходного файла отображаются не все значения; это в основном то же самое, что и консольный вывод, например, первая строка: «value1 value2 ... (всего 523)» – fricadelle

+0

My bad :(Я обновил ответ. – zero323

0

конвертировать каждый вектор в строку (вы можете сделать это либо на водителя или исполнителями)

val pw = new PrintWriter("Matrix.csv") 
rows3.map(_.mkString(",")).collect().foreach(line => pw.println(line)) 
pw.flush 

редактировать: если данные слишком большой, чтобы поместиться в память водителя, вы можете попробовать что-то вроде этого:

val rdd = rows3.map(_.mkString(",")).zipWithIndex.cache 
val total = rdd.count 
val step = 10000 //rows in each chunk 
val range = 0 to total by step 
val limits = ranges.zip(range.drop(1)) 
limits.foreach { case(start, end) => 
        rdd.filter(x => x._2 >= start && x._2 < end) 
        .map(_._1) 
        .collect 
        .foreach(pw.println(_)) 
} 

Я не могу попробовать это, но это общая идея

+0

Это не то, как это должно быть сделано. Вы должны забыть, что вы пытаетесь писать с границами распределенной системы. Вы должны столкнуться с некоторыми проблемами параллелизма, если это не накладные расходы на JVM! – eliasah

+0

Если данные не слишком большие, это должно работать – lev

+0

Я знаю, что это сработает, но цель Spark как распределенной системы - ее масштабируемость. Кроме того, вы можете столкнуться с некоторыми проблемами параллелизма, учитывая неатомное преобразование по изменяемым данным, которое является результатом. – eliasah

Смежные вопросы