2013-02-10 4 views
0

У меня возникли трудности с пониманием объекта Subject.Реактивные расширения, тема <T>

Рассмотрим следующий код:

 var sub = new Subject<int>(); 
     sub.Subscribe(x => Console.WriteLine(x)); //subscriber #1   
     sub.Subscribe(x => Console.WriteLine(x)); //subscriber #2   
     sub.OnNext(2); 

Я создаю тему междунар, и когда я исполняю OnNext он называет других абонентов (# 1 и # 2). То, что я не понимаю, это то, что я читаю, что Subject означает объект, который является наблюдаемым и наблюдателем, но как это объясняет, почему, когда я вызываю OnNext, вызываются другие подписчики.

Я бы понял, если OnNext субъекта будет распространять его всем подписчикам = публиковать все остальные (что имеет смысл), но когда я проверил исходный код, я не мог видеть ничего, что делает это, см. Ниже.

Может ли кто-то понять из приведенного ниже кода, что именно делает OnNext (2) распространением на другие подписки? (№ 1, № 2)?

общественного запечатанный класс Тема: ISubject, ISubject, IObserver, IObservable, IDisposable { // Поля частное летучий IObserver _observer;

// Methods 
public Subject() 
{ 
    this._observer = NopObserver<T>.Instance; 
} 

public void Dispose() 
{ 
    this._observer = DisposedObserver<T>.Instance; 
} 

public void OnCompleted() 
{ 
    IObserver<T> comparand = null; 
    IObserver<T> completed = DoneObserver<T>.Completed; 
    do 
    { 
     comparand = this._observer; 
    } 
    while (((comparand != DisposedObserver<T>.Instance) && !(comparand is DoneObserver<T>)) && (Interlocked.CompareExchange<IObserver<T>>(ref this._observer, completed, comparand) != comparand)); 
    comparand.OnCompleted(); 
} 

public void OnError(Exception error) 
{ 
    if (error == null) 
    { 
     throw new ArgumentNullException("error"); 
    } 
    IObserver<T> comparand = null; 
    DoneObserver<T> observer3 = new DoneObserver<T> { 
     Exception = error 
    }; 
    DoneObserver<T> observer2 = observer3; 
    do 
    { 
     comparand = this._observer; 
    } 
    while (((comparand != DisposedObserver<T>.Instance) && !(comparand is DoneObserver<T>)) && (Interlocked.CompareExchange<IObserver<T>>(ref this._observer, observer2, comparand) != comparand)); 
    comparand.OnError(error); 
} 

public void OnNext(T value) 
{ 
    this._observer.OnNext(value); 
} 

public IDisposable Subscribe(IObserver<T> observer) 
{ 
    if (observer == null) 
    { 
     throw new ArgumentNullException("observer"); 
    } 
    IObserver<T> comparand = null; 
    IObserver<T> observer3 = null; 
    do 
    { 
     comparand = this._observer; 
     if (comparand == DisposedObserver<T>.Instance) 
     { 
      throw new ObjectDisposedException(""); 
     } 
     if (comparand == DoneObserver<T>.Completed) 
     { 
      observer.OnCompleted(); 
      return Disposable.Empty; 
     } 
     DoneObserver<T> observer4 = comparand as DoneObserver<T>; 
     if (observer4 != null) 
     { 
      observer.OnError(observer4.Exception); 
      return Disposable.Empty; 
     } 
     if (comparand == NopObserver<T>.Instance) 
     { 
      observer3 = observer; 
     } 
     else 
     { 
      Observer<T> observer5 = comparand as Observer<T>; 
      if (observer5 != null) 
      { 
       observer3 = observer5.Add(observer); 
      } 
      else 
      { 
       observer3 = new Observer<T>(new ImmutableList<IObserver<T>>(new IObserver<T>[] { comparand, observer })); 
      } 
     } 
    } 
    while (Interlocked.CompareExchange<IObserver<T>>(ref this._observer, observer3, comparand) != comparand); 
    return new Subscription<T>((Subject<T>) this, observer); 
} 

private void Unsubscribe(IObserver<T> observer) 
{ 
    IObserver<T> comparand = null; 
    IObserver<T> instance = null; 
Label_0004: 
    comparand = this._observer; 
    if ((comparand != DisposedObserver<T>.Instance) && !(comparand is DoneObserver<T>)) 
    { 
     Observer<T> observer4 = comparand as Observer<T>; 
     if (observer4 != null) 
     { 
      instance = observer4.Remove(observer); 
     } 
     else 
     { 
      if (comparand != observer) 
      { 
       return; 
      } 
      instance = NopObserver<T>.Instance; 
     } 
     if (Interlocked.CompareExchange<IObserver<T>>(ref this._observer, instance, comparand) != comparand) 
     { 
      goto Label_0004; 
     } 
    } 
} 

// Properties 
public bool HasObservers 
{ 
    get 
    { 
     return (((this._observer != NopObserver<T>.Instance) && !(this._observer is DoneObserver<T>)) && (this._observer != DisposedObserver<T>.Instance)); 
    } 
} 

// Nested Types 
private class Subscription : IDisposable 
{ 
    // Fields 
    private IObserver<T> _observer; 
    private Subject<T> _subject; 

    // Methods 
    public Subscription(Subject<T> subject, IObserver<T> observer) 
    { 
     this._subject = subject; 
     this._observer = observer; 
    } 

    public void Dispose() 
    { 
     IObserver<T> observer = Interlocked.Exchange<IObserver<T>>(ref this._observer, null); 
     if (observer != null) 
     { 
      this._subject.Unsubscribe(observer); 
      this._subject = null; 
     } 
    } 
} 

}

+0

Кстати, сложность здесь была частью [оптимизации производительности Rx 1.1/2.0] (http://blogs.msdn.com/b/rxteam/archive/2012/03/12/reactive-extensions- V2-0-бета-доступно-now.aspx). (§Улучшение скорости производителя). –

ответ

0

Предмет является наблюдаемой, потому что вы можете подписаться на него. Вы делаете это в своем примере (вы подписали двух подписчиков).

Предмет также наблюдатель, потому что вы можете сделать следующее:

someObservable.Subscribe(subject); 

Таким образом, ваш вопрос будет получать события от someObservable и распространять их в своих собственных абонентов.

P.S. в вашем коде вы сами вызвали метод OnNext(). Но это именно то, что сделает someObservable, когда вы подписываетесь на него со своей темой.

+0

См. Мой ответ, это как бы суммирует его. В любом случае, спасибо за помощь. – Gilad

+0

Извините, прокомментировано в неправильном месте ... –

1

Я знаю это, но меня беспокоит то, что это не имеет смысла. Я углубился в код и выяснил, что их внутренняя реализация наблюдателя содержит больше наблюдателей, см. Ниже.

И если вы проверите метод OnNext, вы увидите, что они повторяют все наблюдатели и вызывают их метод OnNext.

Теперь все имеет смысл для меня, я понял логику, но не мог понять, где она была реализована.

internal class Observer<T> : IObserver<T> 
{ 
    private readonly ImmutableList<IObserver<T>> _observers; 

    public Observer(ImmutableList<IObserver<T>> observers) 
    { 
     this._observers = observers; 
    } 

    internal IObserver<T> Add(IObserver<T> observer) 
    { 
     return new Observer<T>(this._observers.Add(observer)); 
    } 

    public void OnCompleted() 
    { 
     foreach (IObserver<T> observer in this._observers.Data) 
     { 
      observer.OnCompleted(); 
     } 
    } 

    public void OnError(Exception error) 
    { 
     foreach (IObserver<T> observer in this._observers.Data) 
     { 
      observer.OnError(error); 
     } 
    } 

    public void OnNext(T value) 
    { 
     foreach (IObserver<T> observer in this._observers.Data) 
     { 
      observer.OnNext(value); 
     } 
    } 

    internal IObserver<T> Remove(IObserver<T> observer) 
    { 
     int index = Array.IndexOf<IObserver<T>>(this._observers.Data, observer); 
     if (index < 0) 
     { 
      return this; 
     } 
     if (this._observers.Data.Length == 2) 
     { 
      return this._observers.Data[1 - index]; 
     } 
     return new Observer<T>(this._observers.Remove(observer)); 
    } 
}