Я пытаюсь настроить разъем Spark-MongoDB в своей тестовой среде. Мой StreamingContext настроен так:Spark Mongodb Connector Unit Tests
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("test")
.set("spark.mongodb.input.uri", "mongodb://localhost:27017/testdb.testread")
.set("spark.mongodb.output.uri", "mongodb://localhost:27017/testdb.testwrite")
lazy val ssc = new StreamingContext(conf, Seconds(1))
Всякий раз, когда я пытаюсь настроить DStream так:
val records = new ConstantInputDStream(ssc, ssc.sparkContext.makeRDD(seq))
я получаю удар с этой ошибкой
java.lang.IllegalStateException: невозможно вызвать методы на остановленном SparkC КОНТЕКСТ.
Похоже, что контекст начинается, а затем останавливается немедленно, но я не могу понять, почему. Журнал не дает никаких ошибок. Это где она заканчивается началом, а затем сразу же останавливается:
DEBUG] 2016-10-06 18: 29: 51625 org.spark_project.jetty.util.component.AbstractLifeCycle setStarted - НАЧАЛА @ 4858ms osjsServletContextHandler @ 33b85bc {/metrics/json, null, AVAILABLE} [WARN] 2016-10-06 18: 29: 51,660 org.apache.spark.streaming.StreamingContext logWarning - StreamingContext еще не запущен [DEBUG] 2016-10-06 18 : 29: 51,662 org.spark_project.jetty.util.component.AbstractLifeCycle setStopping - stopping [email protected] [DEBUG] 2016-10-06 18: 29: 51,664 org.spark_project.jetty.server .Server doStop - Изящное завершение работы [email protected] по
Когда я извлекаю соединение MongoDB его установка не выключается и все в порядке (за исключением я не могу читать/писать в Монго :()
EDIT: Это тест, где я пытаюсь пишите в манго. Однако мой тестовый пакет не срабатывает даже до того, как я доберусь до этого момента.
"read from kafka queue" in new SparkScope{
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](List("topic"),
Map[String, Object](
"bootstrap.servers"->s"localhost:${kServer.kafkaPort}",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "testing",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
)
)
val writeConfig = WriteConfig(Map(
"collection"->"testcollection",
"writeConcern.w"->"majority",
"db"->"testdb"
), Some(WriteConfig(ssc.sparkContext)))
stream.map(r => (r.key.toLong, r.value.toLong))
.reduceByKey(_+_)
.map{case (k,v) => {
val d = new Document()
d.put("key", k)
d.put("value", v)
d
}}
.foreachRDD(rdd => rdd.saveToMongoDB(writeConfig))
ssc.start
(1 until 10).foreach(x => producer.send(KafkaProducerRecord("topic", "1", "1")))
ssc.awaitTerminationOrTimeout(1500)
ok
}
Неудача происходит здесь, когда я пытаюсь создать поток из коллекции лестницы:
"return a single record with the correct sum" in new SparkScope{
val stream = new ConstantInputDStream(ssc, ssc.sparkContext.makeRDD(seq))
val m = HashMap.empty[Long,Long]
FlattenTimeSeries.flatten(stream).foreachRDD(rdd => m ++= rdd.collect())
ssc.start()
ssc.awaitTerminationOrTimeout(1500)
m.size === 1 and m(1) === 20
}
Класс SparkScope только создает StreamingContext, что я показал выше и вызывает ssc.stop()
после испытания
Очень странно - в примере вы ничего не делаете с Монго - можете ли вы его расширить? – Ross