2016-10-30 2 views
0

Я этот набор данных (я выкладывание несколько строк):Получить CLUSTER_ID и остальные таблицы с помощью искры MLlib KMeans

11.97,1355,401 
3.49,25579,12908 
9.29,129186,10882 
28.73,10153,22356 
3.69,22872,9798 
13.49,160371,2911 
24.36,106764,867 
3.99,163670,16397 
19.64,132547,401 

И я пытаюсь присвоить все эти строки 4 кластеров с использованием K-средства. Для этого я использую код, который я вижу в этом посте: Spark MLLib Kmeans from dataframe, and back again

val data = sc.textFile("/user/cloudera/TESTE1") 
val idPointRDD = data.map(s => (s(0), Vectors.dense(s(1).toInt,s(2).toInt))).cache() 
val clusters = KMeans.train(idPointRDD.map(_._2), 4, 20) 
val clustersRDD = clusters.predict(idPointRDD.map(_._2)) 
val idClusterRDD = idPointRDD.map(_._1).zip(clustersRDD) 
val idCluster = idClusterRDD.toDF("purchase","id","product","cluster") 

Я получаю эти выходы:

scala> import org.apache.spark.mllib.clustering.{KMeans, KMeansModel} 
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel} 

scala> import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.mllib.linalg.Vectors 

scala> val data = sc.textFile("/user/cloudera/TESTE") 
data: org.apache.spark.rdd.RDD[String] = /user/cloudera/TESTE MapPartitionsRDD[7] at textFile at <console>:29 

scala> val idPointRDD = data.map(s => (s(0), Vectors.dense(s(1).toInt,s(2).toInt))).cache() 
idPointRDD: org.apache.spark.rdd.RDD[(Char, org.apache.spark.mllib.linalg.Vector)] = MapPartitionsRDD[8] at map at <console>:31 

Но когда я запускаю его я получаю следующее сообщение об ошибке:

java.lang.UnsupportedOperationException: Schema for type Char is not supported 
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:715) 

Как я могу решить эту проблему?

Большое спасибо!

+0

Как выполнить этот код? В искровой оболочке или приложении? Где и когда вы видите ошибку? В вашей среде IDE, при компиляции или исполнении? – eliasah

+0

@eliasah Я бегу, используя Spark-Shell на Cloudera Vm. Ошибка появляется, когда я выполняю последнюю строку кода. Я пытаюсь получить dataframe с тремя столбцами оригинала + новый столбец, который указывает кластер –

+0

Можете ли вы распечатать содержимое и тип idPointRdd из оболочки? Не то, что вы подозреваете, а фактический результат. – eliasah

ответ

1

Вот что. Фактически вы читаете CSV значений в RDD String и не преобразовываете его правильно в числовые значения. Вместо этого, поскольку строка представляет собой набор символов, когда вы вызываете s (0) на пример, это фактически преобразует значение Char в целое или двойное, но это не то, что вы действительно ищете.

Вы должны разделить ваш val data : RDD[String]

val data : RDD[String] = ??? 
val idPointRDD = data.map { 
    s => 
     s.split(",") match { 
     case Array(x,y,z) => Vectors.dense(x.toDouble, Integer.parseInt(y).toDouble,Integer.parseInt(z).toDouble) 
     } 
}.cache() 

Это должно работать для вас!

+0

@elisah код работает отлично! :) Только еще одно сомнение, почему оно предсказывает только две колонки? Я положил 3 столбца в начале, и это только позволяет мне кластер 2 ... Я обновил код –

+0

Потому что «я получаю этот загар: idCluster.take (10) res2: Array [org.apache.spark .sql.Row] = Array ([11.97,1], [3.49,1], [9,29,2], [28,73,1], [3,69,1], [13,49,0], [24,36,2], [3.99,0], [19.64,2], [4.99,0]) И это не имеет смысла :) –

+0

В соответствии с кодом, который вы использовали до использования двух столбцов. Пожалуйста, заново отредактируйте свой вопрос, как это было раньше. Это создаст путаницу, поскольку на самом деле он использует код вместо моего ответа – eliasah

Смежные вопросы