2015-10-05 2 views
0

Как преобразовать csv в Rdd [Double]? У меня есть ошибка: не может быть применен к (org.apache.spark.rdd.RDD [Unit]) в этой строке:Преобразование Rdd [Vector] в Rdd [Double]

val kd = new KernelDensity().setSample(rows) 

Мой полный код здесь:

import org.apache.spark.mllib.linalg.Vectors 
    import org.apache.spark.mllib.linalg.distributed.RowMatrix 
    import org.apache.spark.mllib.stat.KernelDensity 
    import org.apache.spark.rdd.RDD 
    import org.apache.spark.{SparkContext, SparkConf} 

class KdeAnalysis { 
    val conf = new SparkConf().setAppName("sample").setMaster("local") 
    val sc = new SparkContext(conf) 

    val DATAFILE: String = "C:\\Users\\ajohn\\Desktop\\spark_R\\data\\mass_cytometry\\mass.csv" 
    val rows = sc.textFile(DATAFILE).map { 
    line => val values = line.split(',').map(_.toDouble) 
     Vectors.dense(values) 
    }.cache() 



    // Construct the density estimator with the sample data and a standard deviation for the Gaussian 
    // kernels 
    val rdd : RDD[Double] = sc.parallelize(rows) 
    val kd = new KernelDensity().setSample(rdd) 
    .setBandwidth(3.0) 

    // Find density estimates for the given values 
    val densities = kd.estimate(Array(-1.0, 2.0, 5.0)) 
} 
+0

Я не вижу места в вашем где вы можете получить 'RDD [Unit]'. – zero323

ответ

2

С rows является RDD[org.apache.spark.mllib.linalg.Vector] следующая строка не может работать:

val rdd : RDD[Double] = sc.parallelize(rows) 

parallelize ожидает Seq[T] и RDD не Seq.

Даже если эта часть работала так, как вы ожидаете, ваш вход будет просто неправильным. Правильным аргументом для KernelDensity.setSample является либо RDD[Double], либо JavaRDD[java.lang.Double]. Похоже, в настоящий момент он не поддерживает многомерные данные.

Что касается вопроса от плитки вы можете flatMap

rows.flatMap(_.toArray) 

или даже лучше, когда вы создаете rows

val rows = sc.textFile(DATAFILE).flatMap(_.split(',').map(_.toDouble)).cache() 

, но я сомневаюсь, что это действительно то, что вам нужно.

0

Подготовили этот код, пожалуйста, оценить, если он может помочь вам ->

val doubleRDD = rows.map(_.toArray).flatMap(x => x)