2016-10-17 2 views
2

В Spark Streaming каждый раз, когда получено новое сообщение, модель будет использоваться для прогнозирования sth на основе этого нового сообщения. Но время идет, модель может быть изменена по какой-то причине, поэтому я хочу, чтобы повторно загрузить модель всякий раз, когда новое сообщение приходит. Мой код выглядит следующим образом[Spark Streaming] Как загрузить модель каждый раз, когда приходит новое сообщение?

def loadingModel(@transient sc:SparkContext)={ 
    val model=LogisticRegressionModel.load(sc, "/home/zefu/BIA800/LRModel") 
    model 
} 

var error=0.0 
var size=0.0 
implicit def bool2int(b:Boolean) = if (b) 1 else 0 
def updateState(batchTime: Time, key: String, value: Option[String], state: State[Array[Double]]): Option[(String, Double,Double)] = { 
    val model=loadingModel(sc) 
    val parts = value.getOrElse("0,0,0,0").split(",").map { _.toDouble } 
    val pairs = LabeledPoint(parts(0), Vectors.dense(parts.tail)) 
    val prediction = model.predict(pairs.features) 
    val wrong= prediction != pairs.label 
    error = state.getOption().getOrElse(Array(0.0,0.0))(0) + 1.0*(wrong:Int) 
    size=state.getOption().getOrElse(Array(0.0,0.0))(1) + 1.0 
    val output = (key, error,size) 
    state.update(Array(error,size)) 
    Some(output) 
} 
val stateSpec = StateSpec.function(updateState _) 
    .numPartitions(1) 
setupLogging() 
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092") 
val topics = List("test").toSet 
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topics).mapWithState(stateSpec) 

Когда я запускаю этот код, там будет исключением, как это

Exception in thread "main" org.apache.spark.SparkException: Task not serializable 

Если вам нужна дополнительная информация, то дайте мне знать. Спасибо!

+0

Вы можете разместить полную трассировку стека? также вы пытаетесь использовать любой класс, который не сериализуется внутри искровых преобразований, таких как карта, фильтр? – Shankar

+0

@Shankar Привет, Если я просто загружаю модель ('val model = LogisticRegressionModel.load (sc,"/home/zefu/BIA800/LRModel ")') вне 'updateState' без определения' loadMode'l, он отлично работает. Я думаю, проблема связана с 'sc' –

+0

@Shankar, и я добавляю там еще код: P –

ответ

0

Когда модель используется в функции DStream, искра, похоже, сериализует объект контекста (поскольку функция загрузки модели использует sc), и она терпит неудачу, потому что объект контекста не является сериализуемым. Одним из способов решения является преобразование DStream в RDD, сбор результата, а затем запуск предсказания/оценки модели в драйвере.

Утилита netcat для имитации потоковой передачи, попробовал следующий код для преобразования DStream в RDD, он работает. Посмотрите, помогает ли это.

val ssc = new StreamingContext(sc,Seconds(10)) 
val lines = ssc.socketTextStream("xxx", 9998) 
val linedstream = lines.map(lineRDD => Vectors.dense(lineRDD.split(" ").map(_.toDouble))) 
val logisModel = LogisticRegressionModel.load(sc, /path/LR_Model") 
linedstream.foreachRDD(rdd => { 
    for(item <- rdd.collect().toArray) { 
    val predictedVal = logisModel.predict(item) 
     println(predictedVal + "|" + item); 
    } 
}) 

Понимать собирать не масштабируется, но если вы думаете, что ваши потоковые сообщения меньше числа для любого интервала, это, вероятно, вариант. Это то, что я вижу в Spark 1.4.0, для более высоких версий, вероятно, есть исправление для этого. Смотрите это, если его полезным,

Save ML model for future usage

Смежные вопросы