Этот вопрос вы столкнулись, можно разделить на следующие:
- Преобразование ваши рейтинги (я считаю) в
LabeledPoint
данные X.
- Сохранение X в libsvm формат.
1. Преобразование ваши рейтинги в LabeledPoint
данных X
Рассмотрим следующие исходные рейтинги:
val rawRatings: Seq[String] = Seq("0,1,1.0", "0,3,3.0", "1,1,1.0", "1,2,0.0", "1,3,3.0", "3,3,4.0", "10,3,4.5")
Вы можете обрабатывать эти сырые рейтинги как coordinate list matrix (COO).
Spark реализует распределенную матрицу, поддерживаемую RDD ее записей: CoordinateMatrix
, где каждая запись является кортежем (i: Long, j: Long, value: Double).
Примечание: КоординатаМатрица должна использоваться только в том случае, когда оба размера матрицы огромны, а матрица очень разрежена. (который, как правило, в случае пользователь/пункт рейтингами.)
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
import org.apache.spark.rdd.RDD
val data: RDD[MatrixEntry] =
sc.parallelize(rawRatings).map {
line => {
val fields = line.split(",")
val i = fields(0).toLong
val j = fields(1).toLong
val value = fields(2).toDouble
MatrixEntry(i, j, value)
}
}
Теперь давайте преобразуем, что RDD[MatrixEntry]
в CoordinateMatrix
и извлечь индексированные строки:
val df = new CoordinateMatrix(data) // Convert the RDD to a CoordinateMatrix
.toIndexedRowMatrix().rows // Extract indexed rows
.toDF("label", "features") // Convert rows
2. Сохранение LabeledPoint данных в libsvm формат
С Спарк 2.0, вы можете сделать это, используя DataFrameWriter
. Давайте создадим небольшой пример с некоторыми данными фиктивной LabeledPoint (вы также можете использовать DataFrame
мы создали ранее):
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
val df = Seq(neg,pos).toDF("label","features")
К сожалению, мы не можем использовать DataFrameWriter
напрямую, потому что в то время как большинство компонентов трубопровода поддерживают обратную совместимость для загрузки, некоторые существующие DataFrames и конвейеры в версиях Spark до 2.0, которые содержат векторные или матричные столбцы, возможно, необходимо перенести в новые векторные и матричные типы spark.ml.
Утилиты для преобразования DataFrame столбцов из mllib.linalg
в ml.linalg
типов (и наоборот) могут быть найдены в org.apache.spark.mllib.util.MLUtils.
В нашем случае мы должны сделать следующее (как для фиктивных данных и DataFrame
от step 1.
)
import org.apache.spark.mllib.util.MLUtils
// convert DataFrame columns
val convertedVecDF = MLUtils.convertVectorColumnsToML(df)
Теперь давайте сохранить DataFrame:
convertedVecDF.write.format("libsvm").save("data/foo")
И мы можем проверить содержимое файлов:
$ cat data/foo/part*
0.0 1:1.0 3:3.0
1.0 1:1.0 2:0.0 3:3.0
EDIT: В текущей версии искры (2.1.0) нет необходимости использовать mllib
пакет. Вы можете просто сохранить LabeledPoint
данных в libsvm формата, как показано ниже:
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature.LabeledPoint
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
val df = Seq(neg,pos).toDF("label","features")
df.write.format("libsvm").save("data/foo")
версии искры вы используете? – eliasah
Использование версии 2.0! –