2017-01-16 3 views
2

Я пытаюсь построить систему рекомендаций с использованием Spark, ML ALS, где данными являются следующимиКак использовать реализацию искровой ML АЛС, которая поддерживает общие типы идентификаторов (Int и Long)

"User-ID";"ISBN "; "Book-Rating" 
276725;034545104;0 
276726;0155061224;5 
276727;0446520802;0 
276729;052165615;3 
276729;0521795028;6 

Я использую Spark 2.1.0 и mongoldb для загрузки данных. Вот мой фрагмент кода, который определяет кадр данных и его шема после кастинга.

/* 
* Chargement de données de rating 
*/ 

val dfrating = spark.loadFromMongoDB(readConfig) 

val bookRatings = dfrating.selectExpr("cast(User_ID as Long) User_ID " ,"cast(ISBN as Long) ISBN ", "Book_Rating") 

bookRatings.printSchema() 

val als = new ALS().setMaxIter(10).setRegParam(0.01).setUserCol("User_ID").setItemCol("ISBN").setRatingCol("Book_Rating") 
val model = als.fit(training) 

       ———- After compiling, I have got —— 
root 
|-- User_ID: long (nullable = true) 
|-- ISBN: long (nullable = true) 
|-- Book_Rating: integer (nullable = true) 

+-------+----------+-----------+ 
|User_ID|  ISBN|Book_Rating| 
+-------+----------+-----------+ 
| 215| 61030147|   6| 
| 5750|1853260045|   0| 
| 11676| 743244249|   0| 
| 11676|1551665700|   0| 

Вызванные: java.lang.IllegalArgumentException: АЛС ** поддерживает только значение в диапазоне Integer для столбца сек USER_ID и ISBN. **** Значение ** 8.477024456E9 ** было вне диапазона Integer. ****** at org.apache.spark.ml.recommendation.ALSModelParams $$ anonfun $ 1.apply $ mcID $ sp (ALS.scala : 87)

    ————————————————————— 

Есть ли еще какое-то решение для запуска? У меня есть такие предложения (How to use mllib.recommendation if the user ids are string instead of contiguous integers?How to use long user ID in PySpark ALS, а также Non-integer ids in Spark MLlib ALS) по той же проблеме, но я не знаю, с чего начать. Любая помощь ! Заранее спасибо.


@GPI Спасибо за ваше предложение. Вот что я делаю.

val isbn_als = new StringIndexer() 
     .setHandleInvalid("skip") 
     .setInputCol("ISBN") 
     .setOutputCol("ISBN_als") 
     .fit(uRatings) 

val isbn_als_reverse = new IndexToString() 
     .setInputCol("prediction") 
     .setOutputCol("predictedLabel") 

val als = new ALS().setMaxIter(10).setRegParam(0.01).setUserCol("User_ID").setItemCol("ISBN_als").setRatingCol("Book_Rating") 

    /* 
     * On définit l'ordre des opérations à effectuer 
     */ 

    println("On passe au Pipeline") 

    val alsPipeline = new Pipeline().setStages(Array(isbn_als, als, isbn_als_reverse)) 

    /* 
     * On construit le modèle de recommandation à partir des données de Training 
     */ 

    println("On passe à la construction du modèle") 

    val alsModel = alsPipeline.fit(training) 


    /* 
     * On exécute le modèle sur les données de Test, puis on affiche un échantillon de prédictions 
     */ 

    println("On exécute le modèle sur les données de Test") 


    val alsPredictions = alsModel.transform(test).na.drop() 


    println("Affichage des prédictions") 

    alsPredictions.select($"User_ID",$"ISBN", $"Book_Rating", $"prediction").show(20) 

Но у меня есть это исключение, когда я использую IndexToString() на конвейере.

On passe au Pipeline 
On passe à la construction du modèle 
On exécute le modèle sur les données de Test 
Exception in thread "main" java.lang.ClassCastException: org.apache.spark.ml.attribute.UnresolvedAttribute$ cannot be cast to org.apache.spark.ml.*attribute.NominalAttribute* 
    at org.apache.spark.ml.feature.IndexToString.transform(StringIndexer.scala:313) 
    at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305) 
    at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305) 
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57) 
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66) 

---------------------------------------

Когда я не использую IndexToString(), у меня есть отрицательное предсказание.

+-------+---------+-----------+-------------+ 
|User_ID|  ISBN|Book_Rating| prediction| 
+-------+---------+-----------+-------------+ 
| 140340|786881852|   10| 6.9798374| 
| 127327|786881852|   0|-1.2718141E-4| 
| 103336|786881852|   0| 1.2374072| 
| 138578|786881852|   9|  8.200257| 
| 172742|786881852|   0| -1.3278971| 
| 31909|786881852|   6|  5.997123| 
| 69554|786881852|   5|  2.819587| 
| 173650|786881852|   0| 0.42850634| 

---------------------------------

Я полагаю, что отрицательный прогноз из-за IndexToString(), который не используется. Если да, то как использовать IndexToString() в конвейере? Спасибо в продвинутом состоянии

ответ

2

Исключение, которое вы получаете, испускается частью IndexToString, которая неправильно сконфигурирована. Вы заставляете его декодировать предсказание обратно на String, но прогноз не является продуктом (ISBN), это рейтинг: ALS предсказывает рейтинги, а не продукты.

Что по очереди означает, что вам не нужен инвертор.

Смотрите следующий, рабочий образец:

scala> import org.apache.spark.ml.Pipeline 
import org.apache.spark.ml.Pipeline 

scala> import org.apache.spark.ml.recommendation._ 
import org.apache.spark.ml.recommendation._ 

scala> import org.apache.spark.ml.feature._ 
import org.apache.spark.ml.feature._ 

// This is just a helper 
scala> case class Rating(user: Long, isbn: String, rating: Double) 
defined class Rating 

// Let's create 2 books, 3 users, 3 ratings to train the model  
scala> val rawRatings = Seq(Rating(1, "1234567890123", 1), Rating(2, "123456789", 2), Rating(3, "123456789", 3)) 
rawRatings: Seq[Rating] = List(Rating(1,1234567890123,1.0), Rating(2,123456789,2.0), Rating(3,123456789,3.0)) 

scala> val ratings = spark.createDataFrame(rawRatings) 

scala> val isbn_als = new StringIndexer().setInputCol("isbn").setOutputCol("isbnIDX") 
isbn_als: org.apache.spark.ml.feature.StringIndexer = strIdx_53d752f20587 

scala> val als = new ALS().setUserCol("user").setItemCol("isbnIDX").setRatingCol("rating") 
als: org.apache.spark.ml.recommendation.ALS = als_41eff9ae835d 

scala> val stages = Array(isbn_als, als) 
stages: Array[org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable}}] = Array(strIdx_53d752f20587, als_41eff9ae835d, idxToStr_9b2ca994074f) 

// Do the actual training 
scala> val pipeline = new Pipeline().setStages(stages) 
pipeline: org.apache.spark.ml.Pipeline = pipeline_5f05891139b6 

scala> val pipeModel = pipeline.fit(ratings) 
pipeModel: org.apache.spark.ml.PipelineModel = pipeline_5f05891139b6 

// And make predictions for any user/book combination 
scala> case class UserBook(user: Long, isbn: String) 
defined class UserBook 

scala> val testSet = Seq(UserBook(1, "123456789")) 
testSet: Seq[UserBook] = List(UserBook(1,123456789)) 

scala> val testDF = spark.createDataFrame(testSet) 
testDF: org.apache.spark.sql.DataFrame = [user: bigint, isbn: string] 

scala> pipeModel.transform(testDF).show 
+----+--------------+-------+----------+ 
|user|   isbn|isbnIDX|prediction| 
+----+--------------+-------+----------+ 
| 1|123456789| 0.0| 0.7389956| 
+----+--------------+-------+----------+ 

Здесь «предсказание» рейтинги прогноза для книги ISBN 123456789для пользователя 1. isbnIDX используется для вычислительных целей и не должно быть обратными, потому что у нас уже есть isbn в dataframe.

+0

GPI, он исправляет мой допрос. Спасибо за ваше время. –

+0

@ MAA-BIK вы можете пометить ответ как принятый, если он работает для вас, и/или опросить его. – GPI

+0

GPI извините! Это работает для меня –

Смежные вопросы