2016-04-22 2 views
1

Я пытаюсь получить лучшее представление о Scala MongoDB SrcПонимание implicits и подписаться в MongoDB Scala ЦСИ

Использование драйвера MongoDB Scala (АНП документ: http://mongodb.github.io/mongo-scala-driver/)

, когда я использую

val collection: MongoCollection[Document] = database.getCollection("mycollection"); 

     val observable: Observable[Completed] = collection.insertOne(doc) 


     observable.subscribe(new Observer[Completed] { 
     override def onNext(result: Completed): Unit = println("Inserted") 
     override def onError(e: Throwable): Unit = println("Failed") 
     override def onComplete(): Unit = println("Completed") 
     }) 

это неявный метод

/** 
    * Subscribes to the [[Observable]] and requests `Long.MaxValue`. 
    * 
    * Uses the default or overridden `onNext`, `onError`, `onComplete` partial functions. 
    * 
    * @param doOnNext anonymous function to apply to each emitted element. 
    * @param doOnError anonymous function to apply if there is an error. 
    * @param doOnComplete anonymous function to apply on completion. 
    */ 
    def subscribe(doOnNext: T => Any, doOnError: Throwable => Any, doOnComplete:() => Any): Unit = { 
     observable.subscribe(new Observer[T] { 
     override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) 

     override def onNext(tResult: T): Unit = doOnNext(tResult) 

     override def onError(throwable: Throwable): Unit = doOnError(throwable) 

     override def onComplete(): Unit = doOnComplete() 

     }) 
    } 

ЦСИ: https://github.com/mongodb/mongo-scala-driver/blob/master/driver/src/main/scala/org/mongodb/scala/ObservableImplicits.scala

вызывается из:

/** 
    * Request `Observable` to start streaming data. 
    * 
    * This is a "factory method" and can be called multiple times, each time starting a new [[Subscription]]. 
    * Each `Subscription` will work for only a single [[Observer]]. 
    * 
    * If the `Observable` rejects the subscription attempt or otherwise fails it will signal the error via [[Observer.onError]]. 
    * 
    * @param observer the `Observer` that will consume signals from this `Observable` 
    */ 
    def subscribe(observer: Observer[_ >: T]): Unit 

ЦСИ: https://github.com/mongodb/mongo-scala-driver/blob/master/driver/src/main/scala/org/mongodb/scala/Observable.scala

кажется, что вызов подписываются вызывается новый поток (как это называется подписка), но я не вижу, где этот новый поток называется от src?

Implicits используются для достижения этой «проводки», которая вызывает метод неявной подписки, когда я использую observable.subscribe(new Observer[Completed] {....?

Обновление:

Используя этот код:

import org.mongodb.scala.MongoClient; 
import org.mongodb.scala.bson.collection.immutable.Document; 
import org.mongodb.scala._ 
import org.scalatest._ 
import Matchers._ 
import org.mongodb.scala._ 

class MongoSpec extends FlatSpec with Matchers { 

    "Test MongoDb" should "insert" in { 
    { 
     val mongoClient: MongoClient = MongoClient() 
     val database: MongoDatabase = mongoClient.getDatabase("scala-poc"); 

     val doc: Document = Document("_id" -> 6, "name" -> "MongoDB", "type" -> "database", 
     "count" -> 1, "info" -> Document("x" -> 203, "y" -> 100)) 

     val collection: MongoCollection[Document] = database.getCollection("documents"); 

     val observable: Observable[Completed] = collection.insertOne(doc) 

     observable.subscribe(new Observer[Completed] { 
     override def onNext(result: Completed): Unit = println("Inserted") 
     override def onError(e: Throwable): Unit = println(" \n\nFailed " + e + "\n\n") 
     override def onComplete(): Unit = println("Completed") 
     }) 

     mongoClient.close(); 

    } 

    } 
} 

ниже, вызывает исключение:

Failed com.mongodb.MongoClientException: Shutdown in progress 

mongoClient.close(); вызывается до завершения метода insertOne.

Таким образом, метод insertOne или подписки является асинхронным?

+1

Для этого вы должны посмотреть исходный код драйвера Java, а не на Scala, который является простой оболочкой вокруг него. Но, конечно, 'insertOne' является асинхронным, потому что драйвер Scala использует драйвер async Java. –

ответ

0
  1. Нет, subscribe(doOnNext, doOnError, doOnComplete) вызовы subscribe(observer) (как вы можете видеть от реализации, указанной в вашем вопросе). Поэтому, если бы это было вызвано оттуда, вы получили бы бесконечный цикл. «Проводка» используется, когда вы пишете что-то вроде observer.subscribe(x => println(s"next = $x"), error => error.printStackTrace(),() => {}).

  2. No, subscribe не создает новую тему. Классы, реализующие Observable, в основном переносят классы из драйвера Java MongoDB и вызывают их собственные методы subscribe, например. override def subscribe(observer: Observer[_ >: TResult]): Unit = observe(wrapped).subscribe(observer). Эти методы subscribe также не запускают новые потоки: см. https://mongodb.github.io/mongo-java-driver/3.1/driver-async/reference/observables/ для некоторых объяснений.

+0

, пожалуйста, ознакомьтесь с обновлением, появляется асинхронная операция, im не уверен где. –

Смежные вопросы