2012-06-07 2 views
10

У меня есть приложение, которое в некоторых точках вызывает 1000 событий почти в одно и то же время. То, что я хотел бы сделать, состоит в том, чтобы выгрузить события в куски из 50 элементов и начать обрабатывать их каждые 10 секунд. Нет необходимости ждать завершения партии до начала новой пакетной обработки.Реактивные расширения: обрабатывать события партиями + добавить задержку между каждой партией

Например:

10:00:00: 10000 new events received 
10:00:00: StartProcessing (events.Take(50)) 
10:00:10: StartProcessing (events.Skip(50).Take(50)) 
10:00:15: StartProcessing (events.Skip(100).Take(50)) 

Любые идеи, как этого добиться? Я полагаю, что Reactive Extensions - это путь, но другие решения также приемлемы.

Я пытался начать отсюда:

 var bufferedItems = eventAsObservable 
      .Buffer(15) 
      .Delay(TimeSpan.FromSeconds(5) 

Но заметил, что задержка не работает, как я надеялся, а вместо этого все партии начали одновременно, хотя 5 секунд с задержкой.

Я также тестировал Window-метод, но я не заметил никакой разницы в поведении. Я полагаю, что TimeSpan в окно на самом деле означает, что «принимать каждое событие, которое происходит в течение следующих 10 секунд:.

 var bufferedItems = eventAsObservable 
      .Window(TimeSpan.FromSeconds(10), 5) 
      .SelectMany(x => x) 
      .Subscribe(DoProcessing); 

Я использую Rx-Main 2.0.20304 бета-

ответ

20

Если вы не хотите спать темы, вы можете сделать это:

var tick = Observable.Interval(TimeSpan.FromSeconds(5)); 

eventAsObservable 
.Buffer(50) 
.Zip(tick, (res, _) => res) 
.Subscribe(DoProcessing); 
+1

Жаль, что я не смог бы поддержать это более одного раза! Просто провел три часа, пытаясь понять это. –

1

Попробуйте это:

var bufferedItems = eventAsObservable 
     .Buffer(50) 
     .Do(_ => { Thread.Sleep(10000); }); 
Смежные вопросы