2013-04-15 4 views
0

В реактивных расширениях мы имеемКак реализовать переключатель для IObservable <IObserver<T>> для реактивных расширений в C#

IObservable<T> Switch(this IObservable<IObservable<T>> This) 

Я хотел бы реализацию

IObserver<T> Switch(this IObservable<IObserver<T>> This) 

, который будет переключать исходящие события в различные наблюдатель но представлен как один наблюдатель.

ответ

3

Эта версия поддерживает несколько вопросов:

  • Там в состояние гонки, что может привести к потере событий. Если наблюдатель наблюдает за событием в одном потоке, в то время как наблюдаемый источник создает нового наблюдателя в другом потоке, если вы не используете какую-либо синхронизацию, вы можете в конечном итоге вызвать OnCompleted на текущего наблюдателя в одном потоке непосредственно перед тем, как другой поток вызовет OnNext на том же наблюдателе. Это приведет к потере события.

  • Относительно вышесказанного, по умолчанию наблюдатели не являются потокобезопасными. Вы никогда не должны одновременно обращаться к наблюдателю, иначе вы нарушите первичный контракт Rx. Без блокировки абонент может вызвать OnCompleted на currentObserver, а другой поток вызывает OnNext на том же самом наблюдателе. Из-за этого подобные вещи можно решить, используя синхронизированный объект. Но так как нам нужна синхронизация и для предыдущей проблемы, мы можем просто использовать простой мьютекс.

  • Нам нужен способ отписаться от наблюдаемого источника. Я предполагаю, что когда результирующий наблюдатель будет завершен (или ошибочен), это подходящее время для отказа от подписки на источник, поскольку нашему наблюдателю было сказано ожидать больше событий.

Вот код:

public static IObserver<T> Switch<T>(this IObservable<IObserver<T>> source) 
{ 
    var mutex = new object(); 
    var current = Observer.Create<T>(x => {}); 
    var subscription = source.Subscribe(o => 
    { 
     lock (mutex) 
     { 
      current.OnCompleted(); 
      current = o; 
     } 
    }); 

    return Observer.Create<T>(
     onNext: v => 
     { 
      lock(mutex) 
      {     
       current.OnNext(v); 
      } 
     }, 
     onCompleted:() => 
     { 
      subscription.Dispose(); 
      lock (mutex) 
      { 
       current.OnCompleted(); 
      } 
     }, 
     onError: e => 
     { 
      subscription.Dispose(); 
      lock (mutex) 
      { 
       current.OnError(e); 
      } 
     }); 
} 
+0

Вы действительно думаете, что Interlocked.CompareExchange требуется? Какое состояние гонки вы защищаете. Справочная копия является атомарной в .net. http://stackoverflow.com/questions/5816939/are-reference-assignment-and-reading-atomic-operations – bradgonesurfing

+0

Однако удаление подписки будет необходимо по мере добавления. – bradgonesurfing

+1

На самом деле я просто понял, что CompareExchange недостаточно , Есть 2 условия гонки: во-первых, ваш наблюдатель, скорее всего, наблюдает за событиями в другой теме, чем ваш наблюдаемый создает новых наблюдателей. Без какой-либо защиты, когда ваш наблюдатель наблюдает за событием, он может захватить старый «currentObserver» так же, как он изменился, и отправить событие неверному наблюдателю.Другая проблема заключается в том, что он может отправить событие * после *, которое вы завершили наблюдателем, что означает, что событие будет потеряно. Я буду обновлять свой ответ, чтобы использовать фактический замок. – Brandon

1
public static IObserver<T> Switch<T>(this IObservable<IObserver<T>> This) 
{ 
    IObserver<T> currentObserver = Observer.Create<T>(x => { }); 

    This.Subscribe(o => { currentObserver.OnCompleted(); currentObserver = o; }); 


    return Observer.Create<T> 
     (onNext: v => currentObserver.OnNext(v) 
     , onCompleted:() => currentObserver.OnCompleted() 
     , onError: v => currentObserver.OnError(v)); 
} 
+0

вы выпадающие всех предыдущих наблюдателей. Это нормально? –

+0

Я вызываю OnCompleted, когда бросаю предыдущее. Это намерение отказаться от предыдущих наблюдателей. – bradgonesurfing

+0

в вас сомневается, что это звучит как «переключить исходящие события на разных наблюдателей», а не (например) «переключить исходящие события на последнего наблюдателя». он немного путается (по крайней мере для меня) –

Смежные вопросы