У меня есть приложение, выполняющее наблюдаемый интервал с несколькими наблюдателями. Каждые 0,5 секунды интервал загружает некоторые данные XML с веб-сервера, а затем наблюдатели выполняют некоторую обработку, специфичную для конкретного приложения, в фоновом потоке. Как только данные больше не нужны, подписчики и наблюдаемый интервал будут удалены, поэтому OnNext/OnCompleted/OnError наблюдателя больше не будет вызываться. Все идет нормально.Подождите, пока наблюдатель Rx не будет завершен без блокировки
Моя проблема: в некоторых редких случаях возможно, что после вызова Dispose мой метод OnNext моего наблюдателя все еще работает! Прежде чем приступать к дальнейшим операциям после утилизации, я хотел бы убедиться, что OnNext завершен.
Мое текущее решение: я представил поле шкафчика в классе наблюдателя (см. Код). После утилизации я пытаюсь приобрести замок и продолжать только после того, как замок был приобретен. Хотя это решение работает (?), Это как-то просто кажется неправильным для меня.
Вопрос: Есть ли более элегантный, более «Rx Way», чтобы решить эту проблему?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace RxExperimental
{
internal sealed class MyXmlDataFromWeb
{
public string SomeXmlDataFromWeb { get; set; }
}
internal sealed class MyObserver : IObserver<MyXmlDataFromWeb>
{
private readonly object _locker = new object();
private readonly string _observerName;
public MyObserver(string observerName) {
this._observerName = observerName;
}
public object Locker {
get { return this._locker; }
}
public void OnCompleted() {
lock (this._locker) {
Console.WriteLine("{0}: Completed.", this._observerName);
}
}
public void OnError(Exception error) {
lock (this._locker) {
Console.WriteLine("{0}: An error occured: {1}", this._observerName, error.Message);
}
}
public void OnNext(MyXmlDataFromWeb value) {
lock (this._locker) {
Console.WriteLine(" {0}: OnNext running on thread {1}... ", this._observerName, Thread.CurrentThread.ManagedThreadId);
Console.WriteLine(" {0}: XML received: {1}", this._observerName, value.SomeXmlDataFromWeb);
Thread.Sleep(5000); // simulate some long running operation
Console.WriteLine(" {0}: OnNext running on thread {1}... Done.", this._observerName, Thread.CurrentThread.ManagedThreadId);
}
}
}
internal sealed class Program
{
private static void Main() {
const int interval = 500;
//
var dataSource = Observable.Interval(TimeSpan.FromMilliseconds(interval), NewThreadScheduler.Default).Select(_ => {
var data = new MyXmlDataFromWeb {
SomeXmlDataFromWeb = String.Format("<timestamp>{0:yyyy.MM.dd HH:mm:ss:fff}</timestamp>", DateTime.Now)
};
return data;
}).Publish();
//
var observer1 = new MyObserver("Observer 1");
var observer2 = new MyObserver("Observer 2");
//
var subscription1 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer1);
var subscription2 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer2);
//
var connection = dataSource.Connect();
//
Console.WriteLine("Press any key to cancel ...");
Console.ReadLine();
//
subscription1.Dispose();
subscription2.Dispose();
connection.Dispose();
//
lock (observer1.Locker) {
Console.WriteLine("Observer 1 completed.");
}
lock (observer2.Locker) {
Console.WriteLine("Observer 2 completed.");
}
//
Console.WriteLine("Can only be executed, after all observers completed.");
}
}
}
Я бы сказал, что вы не работаете в духе Rx, если вы выполняете долго блокирующей работу в вашем OnNext Перезвони. Поскольку Rx эффективно является конвейером обратных вызовов, вы будете блокировать своего источника-производителя. Возможно, вам захочется либо рассмотреть проект передачи сообщений, либо взглянуть на то, чтобы заставить обработчики OnNext ввести еще один уровень асинхронности (см. Вложенные наблюдаемые последовательности). –
Кроме того, я предлагаю вам не реализовывать IObserver (или, в этом отношении, IObservable ) интерфейсы. Вместо этого создайте запросы с операторами. –