2012-05-01 2 views
1

Я ищу расширение, такое как коммутатор, но вместо того, чтобы по умолчанию использовать самый последний поток, мне нужен коммутатор с дополнительной переменной, которая может идти назад и вперед на основе другого внешнего события.Переключение активного потока кадров

например. Мы наблюдаем, что является слиянием двух потоков A и B, когда обнаружено событие C, я хочу, чтобы наблюдаемый A прошел, и потоки от B будут проигнорированы (потеряны). Когда обнаружено событие D, я хочу, чтобы наблюдаемый поток A был удален, и B был пройден.

Какой лучший оператор linq для этого? Полагаю, я мог бы использовать комбинацию того, где и какое-то пользовательское решение, но действительно искал более элегантное решение.

Также я хотел бы избегать подписки/регенерации из потока, где это возможно. Я просто нахожу подписку/переподписку вообще очень сложно отслеживать, а также сложно связать операторов.

+0

Являются ли события 'C' и' D' выведенными из чего-то в потоках 'A' и' B' или являются ли они внешними событиями? И являются ли «А» и «В» одним и тем же типом? – yamen

+0

@yamen Это внешние события. – Alwyn

ответ

3

Есть много способов решить эту проблему, давайте посмотрим на простой. В этом примере я предполагаю, что события A и B имеют тот же тип. Я также предполагаю, что события C (привязаны к A) и D (привязаны к D) также являются наблюдаемыми без какой-либо важной информации.

var streamA = new Subject<string>(); 
var streamB = new Subject<string>(); 

var switchToA = new Subject<Unit>(); 
var switchToB = new Subject<Unit>(); 

Теперь здесь мы открываем streamA окно всякий раз, когда switchToA пожары и закрыть его, когда switchToB пожары, и сделать наоборот для streamB, а затем объединить их:

public IObservable<string> SwitchingWindows(IObservable<string> streamA, IObservable<string> streamB, IObservable<Unit> switchToA, IObservable<Unit> switchToB) 
{          
    var switchedA = streamA.Window(switchToA, _ => switchToB).Concat(); 
    var switchedB = streamB.Window(switchToB, _ => switchToA).Concat(); 

    return switchedA.Merge(switchedB); 
} 

Подписка:

var all = SwitchingWindows(streamA, streamB, switchToA, switchToB); 
    all.Subscribe(x => Console.WriteLine(x)); 

Испытание:

streamA.OnNext("a1");   // skip 
switchToA.OnNext(Unit.Default); // switch A 
streamA.OnNext("a2");   // shown 
streamB.OnNext("b1");   // skip 
streamA.OnNext("a3");   // shown 
switchToB.OnNext(Unit.Default); // switch B 
streamB.OnNext("b2");   // shown 
streamB.OnNext("b3");   // shown 
streamA.OnNext("a4");   // skip 
switchToA.OnNext(Unit.Default); // switch A 
streamA.OnNext("a5");   // shown 
streamB.OnNext("b4");   // skip 
streamB.OnNext("b5");   // skip 
switchToB.OnNext(Unit.Default); // switch B 
streamB.OnNext("b6");   // shown 
streamA.OnNext("a6");   // skip 

Выход, как и ожидалось:

a2 
a3 
b2 
b3 
a5 
b6 

Конечный поток all 100% чистыми.

+0

Спасибо, это замечательный ответ! – Alwyn

+0

Быстро вопрос: что случилось с Concat() в конце переключаемого наблюдаемого? – Alwyn

+1

Я сделал редактирование, чтобы сделать его намного проще. «Конкат» необходим, потому что функции «GroupJoin» (и теперь «Window») возвращают каждое окно или буфер как другой «IObservable», поэтому тип возвращаемого значения «IObservable >». 'Concat' выравнивает это в' IObservable '. – yamen

Смежные вопросы