2016-05-18 2 views
3

Я пытаюсь получить доступ к полю внутри Tuple2, и компилятор возвращает мне ошибку. Программное обеспечение пытается подталкивать класс case в теме kafka, затем я хочу восстановить его с помощью искровой потоковой передачи, чтобы я мог подавать алгоритм машинного обучения и сохранять результаты в экземпляре mongo.Об ошибке доступа к полю внутри Tuple2

Решенный!

я, наконец, решить мою проблему, я собираюсь опубликовать окончательное решение:

Это проект GitHub:

https://github.com/alonsoir/awesome-recommendation-engine/tree/develop 

build.sbt

name := "my-recommendation-spark-engine" 

version := "1.0-SNAPSHOT" 

scalaVersion := "2.10.4" 

val sparkVersion = "1.6.1" 

val akkaVersion = "2.3.11" // override Akka to be this version to match the one in Spark 

libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka_2.10" % "0.8.1" 
    exclude("javax.jms", "jms") 
    exclude("com.sun.jdmk", "jmxtools") 
    exclude("com.sun.jmx", "jmxri"), 
//not working play module!! check 
//jdbc, 
//anorm, 
//cache, 
// HTTP client 
"net.databinder.dispatch" %% "dispatch-core" % "0.11.1", 
// HTML parser 
"org.jodd" % "jodd-lagarto" % "3.5.2", 
"com.typesafe" % "config" % "1.2.1", 
"com.typesafe.play" % "play-json_2.10" % "2.4.0-M2", 
"org.scalatest" % "scalatest_2.10" % "2.2.1" % "test", 
"org.twitter4j" % "twitter4j-core" % "4.0.2", 
"org.twitter4j" % "twitter4j-stream" % "4.0.2", 
"org.codehaus.jackson" % "jackson-core-asl" % "1.6.1", 
"org.scala-tools.testing" % "specs_2.8.0" % "1.6.5" % "test", 
"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1" , 
"org.apache.spark" % "spark-core_2.10" % "1.6.1" , 
"org.apache.spark" % "spark-streaming_2.10" % "1.6.1", 
"org.apache.spark" % "spark-sql_2.10" % "1.6.1", 
"org.apache.spark" % "spark-mllib_2.10" % "1.6.1", 
"com.google.code.gson" % "gson" % "2.6.2", 
"commons-cli" % "commons-cli" % "1.3.1", 
"com.stratio.datasource" % "spark-mongodb_2.10" % "0.11.1", 
// Akka 
"com.typesafe.akka" %% "akka-actor" % akkaVersion, 
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion, 
// MongoDB 
"org.reactivemongo" %% "reactivemongo" % "0.10.0" 
) 

packAutoSettings 

//play.Project.playScalaSettings 

Кафка Производитель

package example.producer 

import play.api.libs.json._ 
import example.utils._ 
import scala.concurrent.Future 
import example.model.{AmazonProductAndRating,AmazonProduct,AmazonRating} 
import example.utils.AmazonPageParser 
import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.Future 


/** 
args(0) : productId 
args(1) : userdId 

Usage: ./amazon-producer-example 0981531679 someUserId 3.0 
*/ 
object AmazonProducerExample { 

def main(args: Array[String]): Unit = { 

val productId = args(0).toString 
val userId = args(1).toString 
val rating = args(2).toDouble 
val topicName = "amazonRatingsTopic" 

val producer = Producer[String](topicName) 

//0981531679 is Scala Puzzlers... 
AmazonPageParser.parse(productId,userId,rating).onSuccess { case amazonRating => 
    //Is this the correct way? the best performance? possibly not, what about using avro or parquet? How can i push data in avro or parquet format? 
    //You can see that i am pushing json String to kafka topic, not raw String, but is there any difference? 
    //of course there are differences... 
    producer.send(Json.toJson(amazonRating).toString) 
    //producer.send(amazonRating.toString) 
    println("amazon product with rating sent to kafka cluster..." + amazonRating.toString) 
    System.exit(0) 
} 

} 
} 

Это определение необходимых тематических классов (ОБНОВЛЕНО), файл с именем models.scala:

package example.model 

import play.api.libs.json.Json 
import reactivemongo.bson.Macros 

case class AmazonProduct(itemId: String, title: String, url: String, img: String, description: String) 
case class AmazonRating(userId: String, productId: String, rating: Double) 

case class AmazonProductAndRating(product: AmazonProduct, rating: AmazonRating) 

// For MongoDB 
object AmazonRating { 
implicit val amazonRatingHandler = Macros.handler[AmazonRating] 
implicit val amazonRatingFormat = Json.format[AmazonRating] 
//added using @Yuval tip 
lazy val empty: AmazonRating = AmazonRating("-1", "-1", -1d) 
} 

Это полный код процесса потокового искрового:

package example.spark 

import java.io.File 
import java.util.Date 

import play.api.libs.json._ 
import com.google.gson.{Gson,GsonBuilder, JsonParser} 
import org.apache.spark.streaming.{Seconds, StreamingContext} 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.functions._ 

import com.mongodb.casbah.Imports._ 
import com.mongodb.QueryBuilder 
import com.mongodb.casbah.MongoClient 
import com.mongodb.casbah.commons.{MongoDBList, MongoDBObject} 

import reactivemongo.api.MongoDriver 
import reactivemongo.api.collections.default.BSONCollection 
import reactivemongo.bson.BSONDocument 

import org.apache.spark.streaming.kafka._ 
import kafka.serializer.StringDecoder 
import example.model._ 

import example.utils.Recommender 

/** 
* Collect at least the specified number of json amazon products in order to feed recomedation system and feed mongo instance with results. 

Usage: ./amazon-kafka-connector 127.0.0.1:9092 amazonRatingsTopic 

on mongo shell: 

use alonsodb; 
db.amazonRatings.find(); 
*/ 
object AmazonKafkaConnector { 

private var numAmazonProductCollected = 0L 
private var partNum = 0 
private val numAmazonProductToCollect = 10000000 

//this settings must be in reference.conf 
private val Database = "alonsodb" 
private val ratingCollection = "amazonRatings" 
private val MongoHost = "127.0.0.1" 
private val MongoPort = 27017 
private val MongoProvider = "com.stratio.datasource.mongodb" 

private val jsonParser = new JsonParser() 
private val gson = new GsonBuilder().setPrettyPrinting().create() 

private def prepareMongoEnvironment(): MongoClient = { 
    val mongoClient = MongoClient(MongoHost, MongoPort) 
    mongoClient 
} 

private def closeMongoEnviroment(mongoClient : MongoClient) = { 
    mongoClient.close() 
    println("mongoclient closed!") 
} 

private def cleanMongoEnvironment(mongoClient: MongoClient) = { 
    cleanMongoData(mongoClient) 
    mongoClient.close() 
} 

private def cleanMongoData(client: MongoClient): Unit = { 
    val collection = client(Database)(ratingCollection) 
    collection.dropCollection() 
} 

def main(args: Array[String]) { 
// Process program arguments and set properties 

if (args.length < 2) { 
    System.err.println("Usage: " + this.getClass.getSimpleName + " <brokers> <topics>") 
    System.exit(1) 
} 

val Array(brokers, topics) = args 

println("Initializing Streaming Spark Context and kafka connector...") 
// Create context with 2 second batch interval 
val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector") 
           .setMaster("local[4]") 
           .set("spark.driver.allowMultipleContexts", "true") 

val sc = new SparkContext(sparkConf) 
val sqlContext = new SQLContext(sc) 
sc.addJar("target/scala-2.10/blog-spark-recommendation_2.10-1.0-SNAPSHOT.jar") 
val ssc = new StreamingContext(sparkConf, Seconds(2)) 
//this checkpointdir should be in a conf file, for now it is hardcoded! 
val streamingCheckpointDir = "/Users/aironman/my-recommendation-spark-engine/checkpoint" 
ssc.checkpoint(streamingCheckpointDir) 

// Create direct kafka stream with brokers and topics 
val topicsSet = topics.split(",").toSet 
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 
println("Initialized Streaming Spark Context and kafka connector...") 

//create recomendation module 
println("Creating rating recommender module...") 
val ratingFile= "ratings.csv" 
val recommender = new Recommender(sc,ratingFile) 
println("Initialized rating recommender module...") 
//THIS IS THE MOST INTERESTING PART AND WHAT I NEED! 
//THE SOLUTION IS NOT PROBABLY THE MOST EFFICIENT, BECAUSE I HAD TO 
//USE DATAFRAMES, ARRAYs and SEQs BUT IS FUNCTIONAL! 
try{ 
messages.foreachRDD(rdd => { 
val count = rdd.count() 
if (count > 0){ 
    val json= rdd.map(_._2) 
    val dataFrame = sqlContext.read.json(json) //converts json to DF 
    val myRow = dataFrame.select(dataFrame("userId"),dataFrame("productId"),dataFrame("rating")).take(count.toInt) 
    println("myRow is: " + myRow) 

    val myAmazonRating = AmazonRating(myRow(0).getString(0), myRow(0).getString(1), myRow(0).getDouble(2)) 
    println("myAmazonRating is: " + myAmazonRating.toString) 
    val arrayAmazonRating = Array(myAmazonRating) 
    //this method needs Seq[AmazonRating] 
    recommender.predictWithALS(arrayAmazonRating.toSeq) 
    }//if 
})  
}catch{ 
    case e: IllegalArgumentException => {println("illegal arg. exception")}; 
    case e: IllegalStateException => {println("illegal state exception")}; 
    case e: ClassCastException  => {println("ClassCastException")}; 
    case e: Exception    => {println(" Generic Exception")}; 
}finally{ 

    println("Finished taking data from kafka topic...") 
} 

ssc.start() 
ssc.awaitTermination() 

println("Finished!") 
} 
} 

Спасибо всем, люди, @ Yuval, @Emecas и @ Riccardo.cardin.

метод подписи Recommender.predict выглядит следующим образом:

def predict(ratings: Seq[AmazonRating]) = { 
    // train model 
    val myRatings = ratings.map(toSparkRating) 
    val myRatingRDD = sc.parallelize(myRatings) 

    val startAls = DateTime.now 
    val model = ALS.train((sparkRatings ++ myRatingRDD).repartition(NumPartitions), 10, 20, 0.01) 

    val myProducts = myRatings.map(_.product).toSet 
    val candidates = sc.parallelize((0 until productDict.size).filterNot(myProducts.contains)) 

    // get ratings of all products not in my history ordered by rating (higher first) and only keep the first NumRecommendations 
    val myUserId = userDict.getIndex(MyUsername) 
    val recommendations = model.predict(candidates.map((myUserId, _))).collect 
    val endAls = DateTime.now 
    val result = recommendations.sortBy(-_.rating).take(NumRecommendations).map(toAmazonRating) 
    val alsTime = Seconds.secondsBetween(startAls, endAls).getSeconds 

    println(s"ALS Time: $alsTime seconds") 
    result 
    } 

// Я думаю, что я был как можно более четко, скажите мне, если вам нужно что-нибудь еще и спасибо за ваше терпение учить меня @Yuval

ответ

1

Диагноз

IllegalStateException предполагает, что вы работаете над StreamingContext, что уже ACTIVE или STOPPED. see details here (lines 218-231)

java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported 

Обзор Код

Наблюдая код AmazonKafkaConnector, вы делаете map, filter и foreachRDD в другой foreachRDD за тот же DirectStream объект под названием: messages

Общие рекомендации:

быть функциональным мой друг, путем деления логики на мелкие кусочки для каждой из задач, которые вы хотите выполнить:

  • Streaming
  • ML Рекомендацию
  • Постоянство
  • т.д.

Это поможет вам лучше понять и отладить трубопровод искры, который вы хотите реализовать.

1

проблема заключается в том, что заявление rdd.take(count.toInt) возвращать Array[T], как указано here

def take(num: Int): Array[T] 

Возьмите первые цифровые элементы RDD.

Вы говорите своему RDD, чтобы взять первые n элементов в нем. Тогда, иначе, чем вы предполагаете, у вас нет объекта типа Tuple2, но массив.

Если вы хотите печатать каждый элемент массива, вы можете использовать метод mkString, определенный на Array типа, чтобы получить единый String со всеми элементами массива.

1

Похоже, что вы пытаетесь сделать это просто map через DStream. A map операция представляет собой проекцию от A на тип B, где A является String (что вы получаете от Kafka), а B - ваш случай класса AmazonRating.

Давайте добавим empty значение для вашего AmazonRating:

case class AmazonRating(userId: String, productId: String, rating: Double) 

object AmazonRating { 
    lazy val empty: AmazonRating = AmazonRating("-1", "-1", -1d) 
} 

Теперь давайте разбирать JSONs:

val messages = KafkaUtils 
    .createDirectStream[String, String, StringDecoder, StringDecoder] 
    (ssc, kafkaParams, topicsSet) 

messages 
     .map { case (_, jsonRating) => 
     val format = Json.format[AmazonRating] 
     val jsValue = Json.parse(record) 
     format.reads(jsValue) match { 
      case JsSuccess(rating, _) => rating 
      case JsError(_) => AmazonRating.empty 
     } 
     .filter(_ != AmazonRating.empty) 
     .foreachRDD(_.foreachPartition(it => recommender.predict(it.toSeq))) 
+0

Хорошо, но ошибка просто в том, что он путал объект типа 'Array [T]' с объектом типа 'Tuple2', не так ли? –

+0

@ riccardo.cardin Нет, у него есть 'Array [(String, String)]', потому что он не десериализовал данные, поступающие из Кафки. У него есть кортеж, потому что Кафка посылает как ключ, так и ценность, и он ничего не сделал, чтобы превратить его в класс своего дела. –

+0

У него есть 'Array' из-за этого выражения' val someMessages = rdd.take (count.toInt) '. –

Смежные вопросы