2013-11-07 2 views
0

Я принимаю наблюдаемым от Джозефа Albahari:Как я могу получить общее количество моей наблюдаемой последовательности?

IObservable<int> GetHeartBeat() 
{ 
    return Observable.Create<int>((observer, cancelToken) => this.Start(observer, cancelToken)); 
} 

async Task Start(IObserver<int> observer, CancellationToken cancelToken) 
{ 
    int beat = 0; 
    var random = new Random(); 
    while (beat < 10) 
    { 
     await Task.Delay(random.Next(500) + 700, cancelToken); 
     observer.OnNext(beat); 
     beat++; 
    } 
} 

, а затем я запускаю это:

var observable = this.GetHeartBeat() 
    .TimeInterval() 
    .Buffer(3, 1) 
    .Select(l => l.Average(x => 60/x.Interval.TotalSeconds)); 
observable 
    .Select(i => string.Format("{0}, ", i)) 
    .Concat(observable.Count().Select(i => string.Format("{0}", i))) 
    .Subscribe(i => System.Diagnostics.Debug.WriteLine(i)); 
observable.Wait(); 

Я ожидаю увидеть это:

71.9889864709428, 1 
72.9592264002639, 2 
67.6743699542054, 3 

т.д.

но я вижу это:

71.9889864709428, 
72.9592264002639, 
67.6743699542054, 

Насколько глубоко я могу быть прав в настоящее время?

ответ

1

Я думаю, что вам это нужно:

var observable = this.GetHeartBeat() 
    .TimeInterval() 
    .Buffer(3, 1) 
    .Select((l, i) => string.Format("{0}, {1}", l.Average(x => 60/x.Interval.TotalSeconds), i)) 
    .Subscribe(i => System.Diagnostics.Debug.WriteLine(i)); 
+0

ах, выход Linq;) – rasx

1

Стандартный подход к получению нарастающего итога с Rx является использование оператора Scan.

+0

Отличное лидерство! Посмотрите на это ... – rasx

Смежные вопросы