2016-04-27 3 views
1

Я вставив запись в MongoDB:Wrap асинхронный код в блокирующий вызов

val observable: Observable[Completed] = collection.insertOne(doc) 

observable.subscribe(new Observer[Completed] { 
    override def onNext(result: Completed): Unit = { println("Inserted"); } 
    override def onError(e: Throwable): Unit = { println(" \n\nFailed " + e + "\n\n"); fail() } 
    override def onComplete(): Unit = { println("Completed"); } 
}) 

тест проходит, даже несмотря на то, onError обратного вызова. Это связано с тем, что insertOne является асинхронным методом, и тест завершается до вызова onError. Я хотел бы обернуть метод insertOne в блокирующий вызов, поэтому subscribe не вызывается до тех пор, пока не будет установлено значение observable.

Есть ли идиоматический способ достижения этого в Scala?

+0

Вы можете посмотреть на [ReactiveMongo] (http://reactivemongo.org/) – cchantep

ответ

2

Самый простой способ синхронного блокирования асинхронной операции - использовать Await.result на Future. Поскольку MongoCollection.insertOne возвращает Observable[Complete], вы можете использовать неявный ScalaObservable.toFuture на нем:

val observable = collection.insertOne(doc) 
Await.result(observable.toFuture, Duration.Inf) 

observable.subscribe(new Observer[Completed] { 
    override def onNext(result: Completed): Unit = { println("Inserted"); } 
    override def onError(e: Throwable): Unit = { println(" \n\nFailed " + e + "\n\n"); fail() } 
    override def onComplete(): Unit = { println("Completed"); } 
}) 
+0

должен привести содержать наблюдаемый, что подписаться на ? –

+0

'result' должен быть уже' Seq [T] ', что означает экземпляр' Seq [Completed]. –

+0

не должно быть типа Observable [Completed], так как это возвращается 'collection.insertOne (doc)'? –

Смежные вопросы