2013-03-20 4 views
0

Мне нужно развенчать поток ввода.Входы для разблокировки Rx

При первом вхождении состояния 1 мне нужно подождать 5 секунд и проверить, было ли в последнем состоянии также 1. Только у меня стабильный сигнал.

(time) 0-1-2-3-4-5-6-7-8-9 
(state) 0-0-0-0-0-1-0-1-0-1 
(result)     -> 1 

Ниже приведен пример нестабильного сигнала.

(time) 0-1-2-3-4-5-6-7-8-9 
(state) 0-0-0-0-0-1-0-1-0-0 
(result)     -> 0 

Я попытался использовать буфер, но буфер фиксированный начальную точку, и мне нужно ждать 5 секунд, начиная с моим первым событием.

+0

Что бы вы хотели, чтобы результат был следующим, если следующие состояния в примере 2 были 0-1? –

+0

Является ли выход произведенным каждую секунду? Я согласен с JerKilmball ниже, дополнительная информация по вашему случаю использования будет полезна – AlexFoxGill

ответ

3

Принимая ваши требования буквально

при первом появлении состояния 1 мне нужно подождать 5 секунд и проверить, если Laste состояние также 1. только чем у меня есть стабильный сигнал.

Я могу придумать несколько способов решить эту проблему. Чтобы уточнить мои предположения, вы просто хотите нажать последнее значение, полученное через 5 секунд после первого вхождения в 1. Это приведет к одной последовательности значений, создающей либо 0, либо 1 (т. Е. Независимо от каких-либо дополнительных значений, полученных в прошлом 5 секунд от исходной последовательности)

Здесь я воссоздаю вам последовательность с помощью какого-нибудь jiggery-pokery.

var source = Observable.Timer(TimeSpan.Zero,TimeSpan.FromSeconds(1)) 
    .Take(10) 
    .Select(i=>{if(i==5 || i==7 || i==9){return 1;}else{return 0;}}); //Should produce 1; 
    //.Select(i=>{if(i==5 || i==7){return 1;}else{return 0;}}); //Should produce 0; 

Все приведенные ниже варианты представлены для совместного использования последовательности. Чтобы безопасно передавать последовательность в Rx, мы публикуем() и подключаем ее. Я использую автоматическое соединение с помощью оператора RefCount().

var sharedSource = source.Publish().RefCount(); 

1) В этом решении мы возьмем первое значение 1, а затем буфер выбранных значений последовательности в буфере, чтобы размеры 5 секунд. Мы берем только первый из этих буферов. Как только мы получим этот буфер, мы получим последнее значение и нажмите его. Если буфер пуст, я предполагаю, что мы нажимаем один, поскольку последним значением было «1», которое запустило запуск буфера.

sharedSource.Where(state=>state==1) 
      .Take(1) 
      .SelectMany(_=>sharedSource.Buffer(TimeSpan.FromSeconds(5)).Take(1)) 
      .Select(buffer=> 
      { 
       if(buffer.Any()) 
       { 
        return buffer.Last(); 
       } 
       else{ 
        return 1; 
       } 
      }) 
      .Dump(); 

2) В этом решении я беру подход только начать слушать, как только мы получим действительное значение (1), а затем принимать все значения, пока таймер не вызывает прекращение. Отсюда мы берем последнее полученное значение.

var fromFirstValid = sharedSource.SkipWhile(state=>state==0); 
fromFirstValid 
    .TakeUntil(
     fromFirstValid.Take(1) 
        .SelectMany(_=>Observable.Timer(TimeSpan.FromSeconds(5)))) 
    .TakeLast(1) 
    .Dump(); 

3) В этом решении я использовать оператор окна, чтобы создать единое окно, которое открывается, когда первое значение «1», а затем происходит закрывается, когда 5 секунд истекут. Снова мы просто берем последнее значение

sharedSource.Window(
       sharedSource.Where(state=>state==1), 
       _=>Observable.Timer(TimeSpan.FromSeconds(5))) 
      .SelectMany(window=>window.TakeLast(1)) 
      .Take(1) 
      .Dump(); 

Так много разных способов кошки-кошки.

+0

+1 - Хорошее распространение ответов! Знаете, я вставляю ссылку на ваш пост в Window/Join/etc примерно три раза в кровавую неделю ... :) – JerKimball

+0

Woohoo !! Я уверен, что это дает вам ценность. –

+0

Отлично! Я использовал решение 2. Если вы не используете Rx каждый день, сложно переключить свой мозг в Rx-Mode. Благодаря! –

1

Это звучит (на первый взгляд), как вы хотите Throttle, не Buffer, хотя еще некоторая информация о ваших случаях использование помогло бы придавить, что вниз - во всяком случае, вот как вы можете Throttle ваш поток:

void Main() 
{ 
    var subject = new Subject<int>(); 
    var source = subject.Publish().RefCount(); 

    var query = source 
     // Start counting on a 1, wait 5 seconds, and take the last value 
     .Throttle(x => Observable.Timer(TimeSpan.FromSeconds(5))); 

    using(query.Subscribe(Console.WriteLine)) 
    { 
     // This sequence should produce a one 
     subject.OnNext(1); 
     subject.OnNext(0); 
     subject.OnNext(1); 
     subject.OnNext(0); 
     subject.OnNext(1); 
     subject.OnNext(1); 
     Console.ReadLine(); 
     // This sequence should produce a zero 
     subject.OnNext(0); 
     subject.OnNext(0); 
     subject.OnNext(0); 
     subject.OnNext(0); 
     subject.OnNext(1); 
     subject.OnNext(0); 
     Console.ReadLine(); 
    } 
} 
Смежные вопросы