2013-03-06 5 views
4

У меня есть наблюдаемое, что я подписываюсь. Этот obsevable будет возвращать объект, у которого есть свойство ActivationType, которое может быть установлено несколько раз.Дроссель только в том случае, если встречается определенное условие

То, что я пытаюсь достичь, - это регистрировать сообщение всякий раз, когда для параметра ActivationType установлено значение «Type1». Однако, если для параметра ActivationType установлено значение «Тип2», запишите сообщение только один раз и подождите 30 секунд, прежде чем регистрировать снова, если ActivationType - «Тип2».

Так что, если у меня есть:

myObservable 
    .Where(o => o.ActivationType == "Type1" || o.ActivationType == "Type2") //listen for types 1 and 2 
    .Throttle() // ??? somehow only throttle if we are currently looking at Type2 
    .Subscribe(Log); //log some stuff 

Я считаю, что дроссельная заслонка() является то, что я ищу, но я не знаю, как вызвать его условно.

Любые предложения?

+0

Простой 'Throttle', вероятно, не то, что вы хотите -' Throttle' очень похож на «Window», поскольку он будет запускаться один раз в конце каждого периода (в вашем случае 30 секунд), поэтому любой " Type2 "будут задерживаться до конца каждого окна. – JerKimball

+0

@JerKimball: Я вижу. Я еще новичок в Rx. Что мне следует использовать, если я просто хочу запускать первый раз без каких-либо задержек, а затем игнорировать следующие события Type2 до 30 секунд. (после этого просто запускается еще раз, если необходимо)? – Flack

ответ

6

Ах, идеальный случай для почти невозможно для понимания Window оператора!

EDIT: я отправляю эту ссылку, как дюжину раз в месяц, я клянусь - лучше всего для чтения через я видел в Window, Join, Buffer, GroupJoin и т.д. операторов:

Lee Campbell: Rx Part 9–Join, Window, Buffer and Group Join

var source = new Subject<Thing>(); 

var feed = source.Publish().RefCount(); 
var ofType1 = feed.Where(t => t.ActivationType == "Type1"); 
var ofType2 = feed 
    // only window the type2s 
    .Where(t => t.ActivationType == "Type2") 
    // our "end window selector" will be a tick 30s off from start 
    .Window(() => Observable.Timer(TimeSpan.FromSeconds(30))) 
    // we want the first one in each window... 
    .Select(lst => lst.Take(1)) 
    // moosh them all back together 
    .Merge(); 

    // We want all "type 1s" and the buffered outputs of "type 2s" 
    var query = ofType1.Merge(ofType2); 

    // Let's set up a fake stream of data 
    var running = true; 
    var feeder = Task.Factory.StartNew(
     () => { 
     // until we say stop... 
     while(running) 
     { 
      // pump new Things into the stream every 500ms 
      source.OnNext(new Thing()); 
      Thread.Sleep(500); 
     } 
    }); 

    using(query.Subscribe(Console.WriteLine)) 
    {    
     // Block until we hit enter so we can see the live output 
     // from the above subscribe 
     Console.ReadLine(); 
     // Shutdown our fake feeder 
     running = false; 
     feeder.Wait(); 
    } 
+0

Спасибо за образец и ссылку. Очень полезно. Кажется, он работает как ожидалось, но у меня есть быстрый вопрос. Я внес некоторые незначительные изменения в ваш образец. Я изменил промежуток времени от 30 секунд до 5 и во время цикла фидера, я изменил время Thread.Sleep от 500 до TimeSpan.FromSeconds (4). Теперь, если я Console.WriteLine текущее время при использовании (query.Subscribe (Console.WriteLine)), я вижу строку каждые 4 секунды, когда я ожидаю, что она будет каждые 5 секунд. Вы знаете, почему это так? – Flack

+0

@Flack «Фидер» - это просто что-то, чтобы имитировать ваш реальный поток - код до «var running = true;» - это важный материал; Вы бы заменили 'source = ...' своим потоком и вообще ломали фидер. – JerKimball

+0

Да, в моем коде я использую свои текущие потоки. Я просто возился с вашим образцом, чтобы лучше понять функциональность Window.Поэтому, изменив некоторые значения, я смутился, думая, что я увижу только 1 обновление max каждые 5 секунд, но вместо этого вы увидите обновления каждые 4 секунды вместо 5. – Flack

2

Почему бы просто не использовать два потока?

var baseStream = myObservable.Publish().RefCount(); // evaluate once 
var type1 = baseStream.Where(o => o.ActivationType == "Type1"); 
var type2 = baseStream.Where(o => o.ActivationType == "Type2").Throttle(TimeSpan.FromSeconds(30)); 

type1.Merge(type2).Subscribe(Log); 
+0

Одна проблема с использованием 'Throttle' (в комментарии к OP) - первое получение« Type2 »будет задерживаться окном дроссельной заслонки, что может не быть желательным поведением. – JerKimball

+0

Спасибо Алекс. Я провел некоторое тестирование, и, как указал Джерримбол, первое получение Type2 не происходит до 30 секунд. (при условии, что никакие последующие Type2 не прибудут и не закроют период дроссельной заслонки) – Flack

+0

Хорошая точка. В качестве замены, как насчет '.Buffer (TimeSpan.FromSeconds (30)). Где (list => list.Any()). Выберите (list => list.Last())'? – AlexFoxGill

Смежные вопросы