Эта версия поддерживает несколько вопросов:
Там в состояние гонки, что может привести к потере событий. Если наблюдатель наблюдает за событием в одном потоке, в то время как наблюдаемый источник создает нового наблюдателя в другом потоке, если вы не используете какую-либо синхронизацию, вы можете в конечном итоге вызвать OnCompleted
на текущего наблюдателя в одном потоке непосредственно перед тем, как другой поток вызовет OnNext
на том же наблюдателе. Это приведет к потере события.
Относительно вышесказанного, по умолчанию наблюдатели не являются потокобезопасными. Вы никогда не должны одновременно обращаться к наблюдателю, иначе вы нарушите первичный контракт Rx. Без блокировки абонент может вызвать OnCompleted
на currentObserver
, а другой поток вызывает OnNext
на том же самом наблюдателе. Из-за этого подобные вещи можно решить, используя синхронизированный объект. Но так как нам нужна синхронизация и для предыдущей проблемы, мы можем просто использовать простой мьютекс.
Нам нужен способ отписаться от наблюдаемого источника. Я предполагаю, что когда результирующий наблюдатель будет завершен (или ошибочен), это подходящее время для отказа от подписки на источник, поскольку нашему наблюдателю было сказано ожидать больше событий.
Вот код:
public static IObserver<T> Switch<T>(this IObservable<IObserver<T>> source)
{
var mutex = new object();
var current = Observer.Create<T>(x => {});
var subscription = source.Subscribe(o =>
{
lock (mutex)
{
current.OnCompleted();
current = o;
}
});
return Observer.Create<T>(
onNext: v =>
{
lock(mutex)
{
current.OnNext(v);
}
},
onCompleted:() =>
{
subscription.Dispose();
lock (mutex)
{
current.OnCompleted();
}
},
onError: e =>
{
subscription.Dispose();
lock (mutex)
{
current.OnError(e);
}
});
}
Вы действительно думаете, что Interlocked.CompareExchange требуется? Какое состояние гонки вы защищаете. Справочная копия является атомарной в .net. http://stackoverflow.com/questions/5816939/are-reference-assignment-and-reading-atomic-operations – bradgonesurfing
Однако удаление подписки будет необходимо по мере добавления. – bradgonesurfing
На самом деле я просто понял, что CompareExchange недостаточно , Есть 2 условия гонки: во-первых, ваш наблюдатель, скорее всего, наблюдает за событиями в другой теме, чем ваш наблюдаемый создает новых наблюдателей. Без какой-либо защиты, когда ваш наблюдатель наблюдает за событием, он может захватить старый «currentObserver» так же, как он изменился, и отправить событие неверному наблюдателю.Другая проблема заключается в том, что он может отправить событие * после *, которое вы завершили наблюдателем, что означает, что событие будет потеряно. Я буду обновлять свой ответ, чтобы использовать фактический замок. – Brandon