Вот идиоматический способ сделать это в Rx, как метод расширения - объяснение и пример использования вашего сценарий следующим образом.
Желаемая функция работает так же, как Observable.Throttle
, но испускает квалификационные события, как только они поступают, а не задерживается на время дросселя или периода выборки. Для заданной продолжительности после отборочного события, последующие события подавляются:
public static IObservable<T> SampleFirst<T>(
this IObservable<T> source,
TimeSpan sampleDuration,
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return source.Publish(ps =>
ps.Window(() => ps.Delay(sampleDuration,scheduler))
.SelectMany(x => x.Take(1)));
}
Идея заключается в том, чтобы использовать перегрузку окна, что создает непересекающееся окно с использованием windowClosingSelector, который использует источник время сдвигается назад на sampleDuration , Поэтому каждое окно будет: (a) закрыто первым элементом в нем и (b) оставаться открытым до тех пор, пока не будет разрешен новый элемент. Затем мы просто выбираем первый элемент из каждого окна.
В следующем примере я повторил точно ваш тестовый сценарий, моделирующий один «тире», как 100 тиков. Обратите внимание, что задержка указана как 499 тиков, а не 500 из-за разрешения проходящих событий между несколькими планировщиками, вызывающими 1 тиковый дрейф - на практике вам не нужно будет останавливаться на этом, поскольку разрешения на один тик вряд ли будут значимыми. Методы ReactiveTest
класса и OnNext
хелперы сделаны доступными путем включения основы тестирования Rx NuGet пакет rx-testing
:
public class Tests : ReactiveTest
{
public void Scenario()
{
var scheduler = new TestScheduler();
var test = scheduler.CreateHotObservable<int>(
// set up events as per the OP scenario
// using 1 dash = 100 ticks
OnNext(200, 1),
OnNext(400, 2),
OnNext(500, 3),
OnNext(800, 4),
OnNext(900, 5),
OnNext(1500, 6),
OnNext(1600, 7),
OnNext(1700, 8),
OnNext(1800, 9),
OnNext(1900, 0),
OnNext(2000, 1),
OnNext(2100, 2),
OnNext(2200, 3),
OnNext(2300, 4)
);
test.SampleFirst(TimeSpan.FromTicks(499), scheduler)
.Timestamp(scheduler)
.Subscribe(x => Console.WriteLine(
"Time: {0} Value: {1}", x.Timestamp.Ticks, x.Value));
scheduler.Start();
}
}
Обратите внимание, что выход по вашему сценарию:
Time: 200 Value: 1
Time: 800 Value: 4
Time: 1500 Value: 6
Time: 2000 Value: 1
возможного дубликата [Rx: Как Я отвечаю немедленно и обрабатываю последующие запросы] (http://stackoverflow.com/questions/7999503/rx-how-can-i-respond-immediately-and-throttle-subsequent-requests) –
@JamesWorld Это точно такой же вопрос , благодаря! Я проголосовал за закрытие. – pauloya
@JamesWorld Correction: Это не совсем тот же вопрос, я хочу, чтобы мой период времени начинался с первого элемента. В то время как другой вопрос просто хочет получить значение каждую секунду. В моем примере «6» немного поэтапно отменено, что делает последнее число «1», вместо «9» – pauloya