2014-12-27 5 views
3

У меня есть приложение, выполняющее наблюдаемый интервал с несколькими наблюдателями. Каждые 0,5 секунды интервал загружает некоторые данные XML с веб-сервера, а затем наблюдатели выполняют некоторую обработку, специфичную для конкретного приложения, в фоновом потоке. Как только данные больше не нужны, подписчики и наблюдаемый интервал будут удалены, поэтому OnNext/OnCompleted/OnError наблюдателя больше не будет вызываться. Все идет нормально.Подождите, пока наблюдатель Rx не будет завершен без блокировки

Моя проблема: в некоторых редких случаях возможно, что после вызова Dispose мой метод OnNext моего наблюдателя все еще работает! Прежде чем приступать к дальнейшим операциям после утилизации, я хотел бы убедиться, что OnNext завершен.

Мое текущее решение: я представил поле шкафчика в классе наблюдателя (см. Код). После утилизации я пытаюсь приобрести замок и продолжать только после того, как замок был приобретен. Хотя это решение работает (?), Это как-то просто кажется неправильным для меня.

Вопрос: Есть ли более элегантный, более «Rx Way», чтобы решить эту проблему?

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Reactive.Concurrency; 
using System.Reactive.Linq; 
using System.Text; 
using System.Threading; 
using System.Threading.Tasks; 

namespace RxExperimental 
{ 
    internal sealed class MyXmlDataFromWeb 
    { 
     public string SomeXmlDataFromWeb { get; set; } 
    } 

    internal sealed class MyObserver : IObserver<MyXmlDataFromWeb> 
    { 
     private readonly object _locker = new object(); 
     private readonly string _observerName; 

     public MyObserver(string observerName) { 
      this._observerName = observerName; 
     } 

     public object Locker { 
      get { return this._locker; } 
     } 

     public void OnCompleted() { 
      lock (this._locker) { 
       Console.WriteLine("{0}: Completed.", this._observerName); 
      } 
     } 

     public void OnError(Exception error) { 
      lock (this._locker) { 
       Console.WriteLine("{0}: An error occured: {1}", this._observerName, error.Message); 
      } 
     } 

     public void OnNext(MyXmlDataFromWeb value) { 
      lock (this._locker) { 
       Console.WriteLine(" {0}: OnNext running on thread {1}... ", this._observerName, Thread.CurrentThread.ManagedThreadId); 
       Console.WriteLine(" {0}: XML received: {1}", this._observerName, value.SomeXmlDataFromWeb); 
       Thread.Sleep(5000); // simulate some long running operation 
       Console.WriteLine(" {0}: OnNext running on thread {1}... Done.", this._observerName, Thread.CurrentThread.ManagedThreadId); 
      } 
     } 
    } 

    internal sealed class Program 
    { 
     private static void Main() { 
      const int interval = 500; 
      // 
      var dataSource = Observable.Interval(TimeSpan.FromMilliseconds(interval), NewThreadScheduler.Default).Select(_ => { 
       var data = new MyXmlDataFromWeb { 
        SomeXmlDataFromWeb = String.Format("<timestamp>{0:yyyy.MM.dd HH:mm:ss:fff}</timestamp>", DateTime.Now) 
       }; 
       return data; 
      }).Publish(); 
      // 
      var observer1 = new MyObserver("Observer 1"); 
      var observer2 = new MyObserver("Observer 2"); 
      // 
      var subscription1 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer1); 
      var subscription2 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer2); 
      // 
      var connection = dataSource.Connect(); 
      // 
      Console.WriteLine("Press any key to cancel ..."); 
      Console.ReadLine(); 
      // 
      subscription1.Dispose(); 
      subscription2.Dispose(); 
      connection.Dispose(); 
      // 
      lock (observer1.Locker) { 
       Console.WriteLine("Observer 1 completed."); 
      } 
      lock (observer2.Locker) { 
       Console.WriteLine("Observer 2 completed."); 
      } 
      // 
      Console.WriteLine("Can only be executed, after all observers completed."); 
     } 
    } 
} 
+0

Я бы сказал, что вы не работаете в духе Rx, если вы выполняете долго блокирующей работу в вашем OnNext Перезвони. Поскольку Rx эффективно является конвейером обратных вызовов, вы будете блокировать своего источника-производителя. Возможно, вам захочется либо рассмотреть проект передачи сообщений, либо взглянуть на то, чтобы заставить обработчики OnNext ввести еще один уровень асинхронности (см. Вложенные наблюдаемые последовательности). –

+0

Кроме того, я предлагаю вам не реализовывать IObserver (или, в этом отношении, IObservable ) интерфейсы. Вместо этого создайте запросы с операторами. –

ответ

3

Да, есть более Rx-способ сделать это.

Первое замечание состоит в том, что отписка от наблюдаемого потока по существу не зависит от того, что в настоящее время происходит в наблюдателе. На самом деле отзывов нет. Поскольку у вас есть требование, которое вы знаете окончательно, когда наблюдения закончились, вам нужно смоделировать это в свой наблюдаемый поток. Другими словами, вместо отказа от подписки на поток, вы должны заполнить поток, чтобы вы могли наблюдать за событием OnComplete. В вашем случае вы можете использовать TakeUntil, чтобы закончить наблюдаемый вместо того, чтобы отменить подписку на него.

Второе замечание состоит в том, что ваша основная программа должна наблюдать, когда ваш «наблюдатель» заканчивает свою работу. Но так как вы сделали своего «наблюдателя» фактическим IObservable, у вас действительно нет способа сделать это. Это общий источник замешательства, который я вижу, когда люди впервые начинают использовать Rx. Если вы моделируете своего «наблюдателя» как еще одну ссылку в наблюдаемой цепочке, ваша основная программа может наблюдать. В частности, ваш «наблюдатель» представляет собой не что иное, как операцию сопоставления (с побочными эффектами), которая отображает входящие данные Xml в «сделанные» сообщения.

Так что, если вы реорганизовывать код, вы можете получить то, что вы хотите ...

public class MyObserver 
{ 
    private readonly string _name; 

    public MyObserver(string name) { _name = name; } 

    public IObservable<Unit> Handle(IObservable<MyXmlDataFromWeb source) 
    { 
     return source.Select(value => 
     { 
      Thread.Sleep(5000); // simulate work 
      return Unit.Default; 
     }); 
    } 
} 

// main 
var endSignal = new Subject<Unit>(); 
var dataSource = Observable 
    .Interval(...) 
    .Select(...) 
    .TakeUntil(endSignal) 
    .Publish(); 
var observer1 = new MyObserver("Observer 1"); 
var observer2 = new MyObserver("Observer 2"); 
var results1 = observer1.Handle(dataSource.ObserveOn(...)); 
var results2 = observer2.Handle(dataSource.ObserveOn(...)); 
// since you just want to know when they are all done 
// just merge them. 
// use ToTask() to subscribe them and collect the results 
// as a Task 
var processingDone = results1.Merge(results2).Count().ToTask(); 

dataSource.Connect(); 

Console.WriteLine("Press any key to cancel ..."); 
Console.ReadLine(); 

// end the stream 
endSignal.OnNext(Unit.Default); 

// wait for the processing to complete. 
// use await, or Task.Result 
var numProcessed = await processingDone; 
+0

Большое спасибо за ваш вклад. Однако этот подход оставляет несколько вопросов без ответа: 1. Выполняя фактическую работу в методе обработки MyOberver, можно заменить обработчик onNext, как этот подход должен обрабатывать события onError и onCompleted? И как бы вы гарантировали, что работа, выполняемая onError/onCompleted, фактически завершена, прежде чем приступать к дальнейшим операциям после отмены интервала? Конечно, я мог бы добавить одноразовую подписку, но тогда у меня такая же проблема, как раньше. ;-) – dotNZ

+0

2. Выполнение фактической работы с методом Handle MyObserver имеет один недостаток: processingDone.Wait() ожидает, пока все сообщения в последовательности не будут обработаны. Такое поведение нежелательно!Я ожидаю, что он будет игнорировать/удалять все выдающиеся сообщения (если они есть) сразу после отмены интервала (и ждать только завершения текущего запуска onNext/onError/onCompleted). Мой первоначальный подход (с блокировкой) достигает такого поведения, удаляя подписки. Любые идеи? – dotNZ

+0

1 - «MyObserver» добавляет больше операторов после 'Select'. Есть много вариантов в зависимости от ваших точных требований. [.Do()] (http://msdn.microsoft.com/en-us/library/hh229830 (v = vs.103) .aspx) - хорошее место для начала поиска. Rx будет гарантировать выполнение операций до завершения задачи 'processingDone'. – Brandon

Смежные вопросы