В Spark Streaming каждый раз, когда получено новое сообщение, модель будет использоваться для прогнозирования sth на основе этого нового сообщения. Но время идет, модель может быть изменена по какой-то причине, поэтому я хочу, чтобы повторно загрузить модель всякий раз, когда новое сообщение приходит. Мой код выглядит следующим образом[Spark Streaming] Как загрузить модель каждый раз, когда приходит новое сообщение?
def loadingModel(@transient sc:SparkContext)={
val model=LogisticRegressionModel.load(sc, "/home/zefu/BIA800/LRModel")
model
}
var error=0.0
var size=0.0
implicit def bool2int(b:Boolean) = if (b) 1 else 0
def updateState(batchTime: Time, key: String, value: Option[String], state: State[Array[Double]]): Option[(String, Double,Double)] = {
val model=loadingModel(sc)
val parts = value.getOrElse("0,0,0,0").split(",").map { _.toDouble }
val pairs = LabeledPoint(parts(0), Vectors.dense(parts.tail))
val prediction = model.predict(pairs.features)
val wrong= prediction != pairs.label
error = state.getOption().getOrElse(Array(0.0,0.0))(0) + 1.0*(wrong:Int)
size=state.getOption().getOrElse(Array(0.0,0.0))(1) + 1.0
val output = (key, error,size)
state.update(Array(error,size))
Some(output)
}
val stateSpec = StateSpec.function(updateState _)
.numPartitions(1)
setupLogging()
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topics = List("test").toSet
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics).mapWithState(stateSpec)
Когда я запускаю этот код, там будет исключением, как это
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
Если вам нужна дополнительная информация, то дайте мне знать. Спасибо!
Вы можете разместить полную трассировку стека? также вы пытаетесь использовать любой класс, который не сериализуется внутри искровых преобразований, таких как карта, фильтр? – Shankar
@Shankar Привет, Если я просто загружаю модель ('val model = LogisticRegressionModel.load (sc,"/home/zefu/BIA800/LRModel ")') вне 'updateState' без определения' loadMode'l, он отлично работает. Я думаю, проблема связана с 'sc' –
@Shankar, и я добавляю там еще код: P –