2016-10-06 2 views
0

Я пытаюсь настроить разъем Spark-MongoDB в своей тестовой среде. Мой StreamingContext настроен так:Spark Mongodb Connector Unit Tests

val conf = new SparkConf() 
      .setMaster("local[*]") 
      .setAppName("test") 
      .set("spark.mongodb.input.uri", "mongodb://localhost:27017/testdb.testread") 
      .set("spark.mongodb.output.uri", "mongodb://localhost:27017/testdb.testwrite") 

lazy val ssc = new StreamingContext(conf, Seconds(1))

Всякий раз, когда я пытаюсь настроить DStream так:

val records = new ConstantInputDStream(ssc, ssc.sparkContext.makeRDD(seq))

я получаю удар с этой ошибкой

java.lang.IllegalStateException: невозможно вызвать методы на остановленном SparkC КОНТЕКСТ.

Похоже, что контекст начинается, а затем останавливается немедленно, но я не могу понять, почему. Журнал не дает никаких ошибок. Это где она заканчивается началом, а затем сразу же останавливается:

DEBUG] 2016-10-06 18: 29: 51625 org.spark_project.jetty.util.component.AbstractLifeCycle setStarted - НАЧАЛА @ 4858ms osjsServletContextHandler @ 33b85bc {/metrics/json, null, AVAILABLE} [WARN] 2016-10-06 18: 29: 51,660 org.apache.spark.streaming.StreamingContext logWarning - StreamingContext еще не запущен [DEBUG] 2016-10-06 18 : 29: 51,662 org.spark_project.jetty.util.component.AbstractLifeCycle setStopping - stopping [email protected] [DEBUG] 2016-10-06 18: 29: 51,664 org.spark_project.jetty.server .Server doStop - Изящное завершение работы [email protected] по

Когда я извлекаю соединение MongoDB его установка не выключается и все в порядке (за исключением я не могу читать/писать в Монго :()

EDIT: Это тест, где я пытаюсь пишите в манго. Однако мой тестовый пакет не срабатывает даже до того, как я доберусь до этого момента.

"read from kafka queue" in new SparkScope{ 

    val stream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent, 
    Subscribe[String, String](List("topic"), 
     Map[String, Object](
     "bootstrap.servers"->s"localhost:${kServer.kafkaPort}", 
     "key.deserializer" -> classOf[StringDeserializer], 
     "value.deserializer" -> classOf[StringDeserializer], 
     "group.id" -> "testing", 
     "auto.offset.reset" -> "latest", 
     "enable.auto.commit" -> (false: java.lang.Boolean) 
    ) 
    ) 
) 
    val writeConfig = WriteConfig(Map(
    "collection"->"testcollection", 
    "writeConcern.w"->"majority", 
    "db"->"testdb" 
), Some(WriteConfig(ssc.sparkContext))) 

    stream.map(r => (r.key.toLong, r.value.toLong)) 
    .reduceByKey(_+_) 
    .map{case (k,v) => { 
     val d = new Document() 
     d.put("key", k) 
     d.put("value", v) 
     d 
    }} 
    .foreachRDD(rdd => rdd.saveToMongoDB(writeConfig)) 

    ssc.start 
    (1 until 10).foreach(x => producer.send(KafkaProducerRecord("topic", "1", "1"))) 
    ssc.awaitTerminationOrTimeout(1500) 
    ok 
} 

Неудача происходит здесь, когда я пытаюсь создать поток из коллекции лестницы:

"return a single record with the correct sum" in new SparkScope{ 
    val stream = new ConstantInputDStream(ssc, ssc.sparkContext.makeRDD(seq)) 
    val m = HashMap.empty[Long,Long] 
    FlattenTimeSeries.flatten(stream).foreachRDD(rdd => m ++= rdd.collect()) 
    ssc.start() 
    ssc.awaitTerminationOrTimeout(1500) 
    m.size === 1 and m(1) === 20 
    } 

Класс SparkScope только создает StreamingContext, что я показал выше и вызывает ssc.stop() после испытания

+0

Очень странно - в примере вы ничего не делаете с Монго - можете ли вы его расширить? – Ross

ответ

1

Получил это. Проблема заключалась в том, что переменная SparkConf не была объявлена ​​lazy, но StreamingContext был. Я не уверен, почему это имеет значение, но это так. Исправлена.

+0

Привет, пользователь1748268, я пытался сохранить данные в MongoDB, но все еще не удался. Можете ли вы поделиться своей упрощенной формой запуска проекта, чтобы я мог посмотреть и продолжить. Спасибо заранее, Cheers :) –

+0

Привет @DynamicRemo. Код, который я написал выше, в основном завершен и фактически отлично работает. Проблема, с которой я столкнулась, была связана с specs2 (моя область действия была абстрактным классом, а не чертой). Какая конкретная проблема у вас есть? Может быть, я могу помочь – Tim