2015-12-09 4 views
2

Я пытаюсь создать разреженный вектор (класс mllib.linalg.Vectors, а не по умолчанию), но я не могу понять, как использовать Seq. У меня есть небольшой тестовый файл с тремя номерами/строками, который я конвертирую в rdd, разбивая текст в двойниках, а затем группирую строки по их первому столбцу.Как объявить разреженный вектор в Spark с помощью Scala?

Тестовый файл

1 2 4 
1 3 5  
1 4 8  
2 7 5  
2 8 4  
2 9 10 

Код

val data = sc.textFile("/home/savvas/DWDM/test.txt") 
val data2 = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))) 
val grouped = data2.groupBy(_(0)) 

Это приводит к grouped иметь эти значения

(2.0,CompactBuffer([2.0,7.0,5.0], [2.0,8.0,4.0], [2.0,9.0,10.0])) 
(1.0,CompactBuffer([1.0,2.0,4.0], [1.0,3.0,5.0], [1.0,4.0,8.0])) 

Но я не могу показаться, чтобы выяснить следующий шаг. Мне нужно взять каждую строку grouped и создать для нее вектор, так что каждая строка нового RDD имеет вектор с третьим значением CompactBuffer в индексе, указанном вторым значением. Короче говоря, я имею в виду, что я хочу, чтобы мои данные были в этом примере.

[0, 0, 0, 0, 0, 0, 5.0, 4.0, 10.0, 0] 
[0, 4.0, 5.0, 8.0, 0, 0, 0, 0, 0, 0] 

Я знаю, что мне нужно использовать разреженный вектор и что есть три способа его создания. Я попытался использовать Seq с tuple2 (индекс, значение), но я не могу понять, как создать такой Seq.

ответ

2

Одним из возможных решений является то, что показано ниже. Первый позволяет преобразовывать данные ожидаемые тип:

import org.apache.spark.rdd.RDD 

val pairs: RDD[(Double, (Int, Double))] = data.map(_.split(" ") match { 
    case Array(label, idx, value) => (label.toDouble, (idx.toInt, value.toDouble)) 
}) 

рядом найти максимальный индекс (размер векторов):

val nCols = pairs.map{case (_, (i, _)) => i}.max + 1 

группы и конвертировать:

import org.apache.spark.mllib.linalg.SparseVector 

def makeVector(xs: Iterable[(Int, Double)]) = { 
    val (indices, values) = xs.toArray.sortBy(_._1).unzip 
    new SparseVector(nCols, indices.toArray, values.toArray) 
} 

val transformed: RDD[(Double, SparseVector)] = pairs 
    .groupByKey 
    .mapValues(makeVector) 

Другим способом вы можете справиться это, предполагая, что первые элементы могут быть безопасно преобразованы в и из целого числа, заключается в использовании CoordinateMatrix:

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} 

val entries: RDD[MatrixEntry] = data.map(_.split(" ") match { 
    case Array(label, idx, value) => 
    MatrixEntry(label.toInt, idx.toInt, value.toDouble) 
}) 

val transformed: RDD[(Double, SparseVector)] = new CoordinateMatrix(entries) 
    .toIndexedRowMatrix 
    .rows 
    .map(row => (row.index.toDouble, row.vector)) 
+0

Он выполняет эту работу, но кажется немного сложной, и создается впечатление, что для этого есть более простой код. Во всяком случае, я действительно новичок в scala, и я очень запутался! – Savvas

Смежные вопросы