У меня есть приложение, которое в некоторых точках вызывает 1000 событий почти в одно и то же время. То, что я хотел бы сделать, состоит в том, чтобы выгрузить события в куски из 50 элементов и начать обрабатывать их каждые 10 секунд. Нет необходимости ждать завершения партии до начала новой пакетной обработки.Реактивные расширения: обрабатывать события партиями + добавить задержку между каждой партией
Например:
10:00:00: 10000 new events received
10:00:00: StartProcessing (events.Take(50))
10:00:10: StartProcessing (events.Skip(50).Take(50))
10:00:15: StartProcessing (events.Skip(100).Take(50))
Любые идеи, как этого добиться? Я полагаю, что Reactive Extensions - это путь, но другие решения также приемлемы.
Я пытался начать отсюда:
var bufferedItems = eventAsObservable
.Buffer(15)
.Delay(TimeSpan.FromSeconds(5)
Но заметил, что задержка не работает, как я надеялся, а вместо этого все партии начали одновременно, хотя 5 секунд с задержкой.
Я также тестировал Window-метод, но я не заметил никакой разницы в поведении. Я полагаю, что TimeSpan в окно на самом деле означает, что «принимать каждое событие, которое происходит в течение следующих 10 секунд:.
var bufferedItems = eventAsObservable
.Window(TimeSpan.FromSeconds(10), 5)
.SelectMany(x => x)
.Subscribe(DoProcessing);
Я использую Rx-Main 2.0.20304 бета-
Жаль, что я не смог бы поддержать это более одного раза! Просто провел три часа, пытаясь понять это. –