2015-03-04 3 views
0

Я пытаюсь «дросселировать» IObservable в (как я полагаю) другим способом стандартных методов дроссельной заслонки.
Я хочу игнорировать значения для 1s после первого игнорируемого значения в потоке.IObservable - Игнорировать новые элементы за промежуток времени

Например, если 1s = 5 тире

source: --1-23--45-----6789
result: --1-----4------6----1--- 

Любые идеи о том, как достичь этого?

+0

возможного дубликата [Rx: Как Я отвечаю немедленно и обрабатываю последующие запросы] (http://stackoverflow.com/questions/7999503/rx-how-can-i-respond-immediately-and-throttle-subsequent-requests) –

+0

@JamesWorld Это точно такой же вопрос , благодаря! Я проголосовал за закрытие. – pauloya

+0

@JamesWorld Correction: Это не совсем тот же вопрос, я хочу, чтобы мой период времени начинался с первого элемента. В то время как другой вопрос просто хочет получить значение каждую секунду. В моем примере «6» немного поэтапно отменено, что делает последнее число «1», вместо «9» – pauloya

ответ

2

Вот идиоматический способ сделать это в Rx, как метод расширения - объяснение и пример использования вашего сценарий следующим образом.

Желаемая функция работает так же, как Observable.Throttle, но испускает квалификационные события, как только они поступают, а не задерживается на время дросселя или периода выборки. Для заданной продолжительности после отборочного события, последующие события подавляются:

public static IObservable<T> SampleFirst<T>(
    this IObservable<T> source, 
    TimeSpan sampleDuration, 
    IScheduler scheduler = null) 
{ 
    scheduler = scheduler ?? Scheduler.Default; 
    return source.Publish(ps => 
     ps.Window(() => ps.Delay(sampleDuration,scheduler)) 
      .SelectMany(x => x.Take(1))); 
} 

Идея заключается в том, чтобы использовать перегрузку окна, что создает непересекающееся окно с использованием windowClosingSelector, который использует источник время сдвигается назад на sampleDuration , Поэтому каждое окно будет: (a) закрыто первым элементом в нем и (b) оставаться открытым до тех пор, пока не будет разрешен новый элемент. Затем мы просто выбираем первый элемент из каждого окна.

В следующем примере я повторил точно ваш тестовый сценарий, моделирующий один «тире», как 100 тиков. Обратите внимание, что задержка указана как 499 тиков, а не 500 из-за разрешения проходящих событий между несколькими планировщиками, вызывающими 1 тиковый дрейф - на практике вам не нужно будет останавливаться на этом, поскольку разрешения на один тик вряд ли будут значимыми. Методы ReactiveTest класса и OnNext хелперы сделаны доступными путем включения основы тестирования Rx NuGet пакет rx-testing:

public class Tests : ReactiveTest 
{ 
    public void Scenario() 
    { 
     var scheduler = new TestScheduler(); 
     var test = scheduler.CreateHotObservable<int>( 
      // set up events as per the OP scenario 
      // using 1 dash = 100 ticks   
      OnNext(200, 1), 
      OnNext(400, 2), 
      OnNext(500, 3), 
      OnNext(800, 4), 
      OnNext(900, 5), 
      OnNext(1500, 6), 
      OnNext(1600, 7), 
      OnNext(1700, 8), 
      OnNext(1800, 9), 
      OnNext(1900, 0), 
      OnNext(2000, 1), 
      OnNext(2100, 2), 
      OnNext(2200, 3), 
      OnNext(2300, 4) 
      ); 

     test.SampleFirst(TimeSpan.FromTicks(499), scheduler) 
      .Timestamp(scheduler) 
      .Subscribe(x => Console.WriteLine(
       "Time: {0} Value: {1}", x.Timestamp.Ticks, x.Value)); 

     scheduler.Start(); 

    } 
} 

Обратите внимание, что выход по вашему сценарию:

Time: 200 Value: 1 
Time: 800 Value: 4 
Time: 1500 Value: 6 
Time: 2000 Value: 1 
+0

Это замечательно! Определенно делает то, что требуется. Вопрос, будет ли он работать, если .Window() получил TimeSpan вместо задержки? – pauloya

+0

Он работает, если вы это делаете, но вы получаете постоянно создаваемые окна. Это не влияет на выход, но влияет на профиль производительности. Вы можете увидеть влияние, если вы попробуете вышеуказанный тест с измененным кодом, чтобы использовать простой «TimeSpan».Тест никогда не завершится, если вы не добавите «OnCompleted» в исходный код. Вот почему я использовал этот механизм - поток не «занят», когда источник тихо. –

0

Это должно сделать трюк. Возможна более короткая реализация.

скапливаются в Scan магазинах Timestamp последнего держали Item и помечает ли Keep каждый элемент.

public static IObservable<T> RateLimit<T>(this IObservable<T> source, TimeSpan duration) 
{ 
    return observable 
     .Timestamp() 
     .Scan(
      new 
      { 
       Item = default(T), 
       Timestamp = DateTimeOffset.MinValue, 
       Keep = false 
      }, 
      (a, x) => 
      { 
       var keep = a.Timestamp + duration <= x.Timestamp; 
       return new 
       { 
        Item = x.Value, 
        Timestamp = keep ? x.Timestamp : a.Timestamp, 
        Keep = keep 
       }; 
      } 
     }) 
     .Where(a => a.Keep) 
     .Select(a => a.Item); 
} 
Смежные вопросы