2015-11-20 2 views
0

Я использую rxscala и нашел очень тонкую проблему, и мой код Simplied на следующее:Почему «собирать» заставляют наблюдателя не получать уведомления?

import rx.lang.scala.Observable 
import rx.lang.scala.subjects.PublishSubject 

object SubtleBug extends App { 

    case class Projects(projects: List[Project] = Nil) 
    case class Project(name: String, docs: List[Doc] = Nil) 
    case class Doc(path: String, baseContent: String) 

    sealed trait ServerEvent 
    case class ProjectNames(projects: Seq[String]) extends ServerEvent 
    case class NewDocument(projectName: String, path: String, version: Int, content: String) extends ServerEvent 

    val receivedEvents = PublishSubject[ServerEvent] 

    new Thread(new Runnable { 
    override def run(): Unit = { 
     val events = Seq(
     new ProjectNames(Seq("p1", "p2")), 
     NewDocument("p1", "/aaa", 1, "my-content"), 
     NewDocument("p1", "/bbb", 1, "my-content") 
    ) 
     events.foreach { event => 
     receivedEvents.onNext(event) 
     Thread.sleep(200) 
     } 
    } 
    }).start() 

    lazy val projects: Observable[Option[Projects]] = receivedEvents.scan(Option.empty[Projects]) { 
    case (_, ProjectNames(names)) => { 
     Some(Projects(names.map(name => Project(name)).toList)) 
    } 
    case (Some(ps), NewDocument(projectName, docPath, version, content)) => { 
     val doc = Doc(docPath, content) 
     val newPs = ps.copy(projects = ps.projects.map { 
     case project if project.name == projectName => project.copy(docs = project.docs ::: List(doc)) 
     case p => p 
     }) 
     Some(newPs) 
    } 
    case _ => None 
    }.collect({ 
    case Some(p) => Some(p) 
    }) 

    projects.foreach(ps => { 
    println("### 111: " + ps.map(_.projects)) 
    projects.foreach(x => println("### 222: " + ps.map(_.projects))) // !!!(2) 
    }) 

    Thread.sleep(2000) 

} 

Ключевым моментом является !!!(2) линия, которая находится внутри самого projects.

Он печатает следующее:

### 111: Some(List(Project(p1,List()), Project(p2,List()))) 
### 111: Some(List(Project(p1,List(Doc(/aaa,my-content))), Project(p2,List()))) 
### 111: Some(List(Project(p1,List(Doc(/aaa,my-content), Doc(/bbb,my-content))), Project(p2,List()))) 

Проблема заключается в том нет ### 222: линии!

Но если изменить collect часть, и добавив None случай:

.collect({ 
    case Some(p) => Some(p) 
    case None => None // added case 
}) 

будет печататься ### 222: линии, как я ожидал:

### 111: None 
### 111: Some(List(Project(p1,List()), Project(p2,List()))) 
### 111: Some(List(Project(p1,List(Doc(/aaa,my-content))), Project(p2,List()))) 
### 222: None 
### 222: None 
### 222: Some(List(Project(p1,List()), Project(p2,List()))) 
### 222: Some(List(Project(p1,List()), Project(p2,List()))) 
### 111: Some(List(Project(p1,List(Doc(/aaa,my-content), Doc(/bbb,my-content))), Project(p2,List()))) 
### 222: None 
### 222: Some(List(Project(p1,List()), Project(p2,List()))) 
### 222: Some(List(Project(p1,List(Doc(/aaa,my-content))), Project(p2,List()))) 
### 222: Some(List(Project(p1,List(Doc(/aaa,my-content))), Project(p2,List()))) 

я не могу понять, зачем.

PS: Вы можете клонировать код здесь: https://github.com/freewind/rxscala-test/blob/master/src/main/scala/myrx/SubtleBug.scala

ответ

0

Там есть состояние гонки в ваших кодах. PublishSubject выведет любые события, прежде чем подписаться на них. Поэтому, если receivedEvents.onNext(event) работает до projects.foreach, событие будет удалено. И в scan, если первое событие отброшено, совпадение шаблонов больше не будет работать.

Вы можете использовать ReplaySubject, чтобы исправить положение.

+0

Это странно, я просто попробовал код, и он выведет строки '### 111: Некоторые (...' как в моем вопросе. Возможно, вы пропустите метод [receivedEvents.scan] (http: // reactivex .io/documentation/operations/scan.html), что похоже на 'foldLeft', поэтому я могу передать' Option.empty [Projects] 'как совпадение с ним – Freewind

Смежные вопросы