2016-05-18 3 views
0

Я использую следующий код здесь - выглядит как вопрос мне в расчистке «кэша Replay»Rx Cache - Replay оператор Clear

https://gist.github.com/leeoades/4115023

Если изменить следующий вызов и код я вижу, что в Replay есть ошибка, т. е. она никогда не очищается. Может кто-нибудь помочь, чтобы исправить это?

private Cache<string> GetCalculator() 
    { 
     var calculation = Observable.Create<string>(o => 
     { 
      _calculationStartedCount++; 

      return Observable.Timer(_calculationDuration, _testScheduler) 
          .Select(_ => "Hello World!" + _calculationStartedCount) // suffixed the string with count to test the behaviour of Replay clearing 
          .Subscribe(o); 
     }); 

     return new Cache<string>(calculation); 
    } 

[Test] 
    public void After_Calling_GetResult_Calling_ClearResult_and_GetResult_should_perform_calculation_again() 
    { 
     // ARRANGE 
     var calculator = GetCalculator(); 

     calculator.GetValue().Subscribe(); 
     _testScheduler.Start(); 

     // ACT 
     calculator.Clear(); 

     string result = null; 
     calculator.GetValue().Subscribe(r => result = r); 
     _testScheduler.Start(); 

     // ASSERT 
     Assert.That(_calculationStartedCount, Is.EqualTo(2)); 
     Assert.That(result, Is.EqualTo("Hello World!2")); // always returns Hello World!1 and not Hello World!2 
     Assert.IsNotNull(result); 
    } 

ответ

2

Проблема тонкая. Последовательность источника Timer завершается после того, как она испускает событие, которое в свою очередь вызывает OnCompleted на внутреннем ReplaySubject, созданное Replay. Когда завершается Subject, он больше не принимает никаких новых значений, даже если появляется новый Observable.

Когда вы переоформить подписку на подстилающей Observable она выполняет еще раз, но не в состоянии перезапустить Subject, поэтому ваш новый Observer может получить только самое последнее значение перед ReplaySubject завершена.

Самым простым решением было бы, вероятно, просто никогда не позволять поток источника полный (непроверенные):

public Cache(IObservable<T> source) 
    { 
     //Not sure why you are wrapping this in an Observable.create 
     _source = source.Concat(Observable.Never()) 
          .Replay(1, Scheduler.Immediate); 
    }