2013-05-05 1 views
2

Я новичок в RX и играю с некоторыми образцами, любые идеи относительно того, почему Console.writeLine в Subscribe не вызывается?RX ObserveOn issue

вар набл = Observable.Create (я => {

   while(true) 
      { 
       Thread.Sleep(250); 

       i.OnNext(2.0); 
      } 



      return() => { }; 
     })   
     .SubscribeOn(Scheduler.TaskPool) 
     .ObserveOn(Scheduler.CurrentThread) 
     .Subscribe(i => 
     { 
      Console.WriteLine("Inside Subscribe"); 
     }); 

Если удалить строку

.ObserveOn(Scheduler.CurrentThread) 

все отлично?

Благодаря

+0

Ну что это * есть * текущая тема? И что вы делаете, чтобы позволить планировщику выполнять свои задачи в текущем потоке? Это консольное приложение? –

+0

Текущий поток будет потоком, на который включен подписчик, поэтому Scheduler.TaskPool Thread. Это в приложении WPF – Andy

+0

'ObserveOn' и' SubscribeOn' может быть довольно запутанным - я настоятельно рекомендую прочитать это: http://blogs.msdn.com/b/rxteam/archive/2009/11/21/observable-context -observable-subscribeon-and-observable-observon.aspx – JerKimball

ответ

4

Как я уже упоминал, SubscribeOn и ObserveOn не являются наиболее употребительными методами; Я настоятельно рекомендую прочитать этот весь путь до конца:

http://blogs.msdn.com/b/rxteam/archive/2009/11/21/observable-context-observable-subscribeon-and-observable-observeon.aspx

В принципе, SubscribeOn сообщает системе, что контекст, чтобы сделать фактическое подписываться/отписываться проводку на, в то время как ObserveOn сообщает системе, что контекст «выполнить как» когда новые значения поступают в источник.

Для начала, позвольте мне настроить ваш пример немного:

Console.WriteLine("Start Thread ID:{0}", Thread.CurrentThread.ManagedThreadId); 
var subscription = Observable.Create<double>(i => 
    { 
     Console.WriteLine("Observable thread ID:{0}", Thread.CurrentThread.ManagedThreadId); 
     while(true) 
     { 
      Console.WriteLine("Pushing values from thread {0}", Thread.CurrentThread.ManagedThreadId); 
      Thread.Sleep(250);    
      i.OnNext(2.0); 
     } 
     return() => { }; 
    })   
    .SubscribeOn(Scheduler.TaskPool) 
    .ObserveOn(Scheduler.CurrentThread) 
    .Subscribe(i => 
    { 
     Console.WriteLine("Subscribable thread ID:{0}", Thread.CurrentThread.ManagedThreadId); 
     Console.WriteLine("Inside Subscribe"); 
    });  
Console.ReadLine(); 
subscription.Dispose(); 

Если запустить это, вы увидите что-то вроде:

Start Thread ID:21 
Observable thread ID:23 
Pushing values from thread 23 
Pushing values from thread 23 
Pushing values from thread 23 
Pushing values from thread 23 

Теперь давайте переставить темы мы ObserveOn и SubscribeOn:

.SubscribeOn(Scheduler.CurrentThread) 
.ObserveOn(Scheduler.TaskPool) 

Теперь мы получаем:

Start Thread ID:26 
Observable thread ID:26 
Pushing values from thread 26 
Pushing values from thread 26 
Subscribable thread ID:27 
Inside Subscribe 
Pushing values from thread 26 
Subscribable thread ID:27 
Inside Subscribe 
Pushing values from thread 26 
Subscribable thread ID:27 
Inside Subscribe 
Смежные вопросы