2014-11-19 3 views
1

Rx с внешними состояниями?Rx с внешними состояниями

Итак, в этом примере есть функциональность Rx, объединенная с полным поведением внешнего состояния. Каков наилучший подход с Rx для достижения этого?

Проблемные кодовые места с «updateActive».

public enum Source 
{ 
    Observable1, 
    Observable2, 
} 

// Type is Source.Observable1 
IObservable<Source> observable1; 

// Type is Source.Observable2 
IObservable<Source> observable2; 

var mergedObservables = observable1.Select(x => Source.Observable1) 
    .Merge(observable2.Select(x => Source.Observable2)); 

var updateActive = false; 

mergedObservables.Subscribe(x => 
{ 
    switch (x.Source) 
    { 
     case Source.Observable1: 
     { 
      if (updateActive) 
       break; 

      updateActive = true; 

      // Here is some code which causes that observable2 will get some new values. 
      // (this coud be also on an other thread) 
      // If this is the case, the new value(s) should be ignored. 

      updateActive = false; 
     } 
     break; 

     case Source.Observable2: 
     { 
      if (updateActive) 
       break; 

      updateActive = true; 

      // Here is some code which causes that observable1 will get some new values. 
      // (this coud be also on an other thread) 
      // If this is the case, the new value(s) should be ignored. 

      updateActive = false; 
     } 
     break; 
    } 
}); 

Примечание: Как я могу транспортировать состояние «updateActive» в операторах Rx

+4

Почему вы объединяете эти два потока, если затем вы разделите логику с помощью инструкции switch? – galenus

+0

Это была первоначальная идея объединить их, чтобы их можно было обрабатывать через Rx-операторы. Если бы было 2 отдельных потока, то это точно как обычная обработка событий, и я хочу избежать этого. Чтобы уточнить вопрос: как я могу транспортировать состояние updateActive в операторы Rx. – BIQAS

+0

Технически этот подход является вполне разумным, если вы блокируете 'updateActive' в правильных местах. Как выглядит пропущенный код? Является ли текущая работа представленной объектом 'Task'? Если это так, я мог бы подумать о том, как это сделать. –

ответ

0

Вы могли бы использовать экземпляр Subject<bool> как государственный перевозчик.

Дано:

Subject<bool> isUpdating = new Subject<bool>(); 

Вы будете использовать его в чем-то вроде этого:

var flaggedObservables = mergedObservables 
    .CombineLatest(isUpdating, (obs, flag) => new {obs, IsUpdating = flag}); 

flaggedObservables 
    .Where(data => !data.IsUpdating) 
    .Select(data => data.obs) 
    .DistinctUntilChanged() 
    .Subscribe(
     obs => 
     { 
      isUpdating.OnNext(true); 

      //do some work on obs 

      isUpdating.OnNext(false); 
     }); 
+0

Я пробовал то, что вы предложили, но это закончилось бесконечным циклом. – BIQAS

+0

Да, извините. Попробуйте это с вызовом 'DistinctUntilChanged', как он теперь появляется. – galenus

+0

@Saqib: Это решает проблему изменчивой переменной 'updateActive', однако, поскольку Эрик Мейер помещает ее в https://social.msdn.microsoft.com/Forums/en-US/bbf87eea-6a17-4920-96d7- 2131e397a234/why-do-emeijer-not-like-subject, «[Subjects] являются« изменяемыми переменными »в мире Rx». –

0

Если вы получили Task, который представляет обновление, это действительно может работать (хотя я не знаю, если я правильно записал вашу желаемую семантику, потому что из вашего начального сообщения кажется, что вы все равно игнорируете все элементы из источника).

public enum Source 
{ 
    Observable1, 
    Observable2, 
} 

// Type is Source.Observable1 
IObservable<Source> observable1; 

// Type is Source.Observable2 
IObservable<Source> observable2; 

var mergedObservables = observable1.Select(x => Source.Observable1) 
    .Merge(observable2.Select(x => Source.Observable2)); 

mergedObservables 
    .Scan(
     new 
     { 
      Value = (Source?)null, 
      CurrentUpdateTask = (Task)null 
     }, 
     (tuple, value) 
     { 
      if ((tuple.CurrentUpdateTask == null) || (tuple.CurrentUpdateTask.IsCompleted)) 
      { 
       // No update running. Start updating. 
       return new 
       { 
        Value = value, 
        CurrentUpdateTask = Update() //Some Task-returning method that does the update. 
       }; 
      } 

      // Update in flight. Ignore value. 
      return new 
      { 
       Value = (Source?)null, 
       tuple.CurrentUpdateTask 
      }; 
     }) 
    .Where(tuple => tuple.Value.HasValue) 
    .Select(tuple.Value.Value) 
    .Subscribe(...); 
+0

Это также будет производить бесконечные петли из-за перекрестной обратной связи в наблюдение. – BIQAS

+0

Что вызывает обратную связь? –

+0

Извините, я должен исправить, он «мог» производить бесконечные петли. Потому что у вас недостаточно информации о том, как 2 объединенных наблюдаемых источника связаны друг с другом. Например, вы находитесь в контексте потока от наблюдаемого1 и в методе обновления могут быть получены некоторые данные, которые будут влиять на поток наблюдаемого2. – BIQAS

0

Просто добавьте Where пункты к вашему observable1 и observable2. Используйте System.Threading.Interlocked, чтобы обеспечить значение isActive, распространяемое на другие темы. Обратите внимание, что всегда существует условие гонки, в котором значение может быть достигнуто одновременно с одновременным наблюдением. Оба закончили выполнение, хотя и не одновременно. Этот код останавливает только те значения, которые были получены, которые были сгенерированы, пока значение было истинным.

// Тип Source.Observable1 IObservable observable1;

// Type is Source.Observable2 
IObservable<Source> observable2; 

int isActive = 0; 

var mergedObservables = Observable.Merge(
    observable1 
     .Where(t => Interlocked.CompareExchange(ref isActive, 1, 2) == 0) 
     .Select(x => Source.Observable1), 
    observable2 
     .Where(t => Interlocked.CompareExchange(ref isActive, 1, 2) == 0) 
     .Select(x => Source.Observable2)); 

mergedObservables.Subscribe(x => 
{ 
    switch (x.Source) 
    { 
     case Source.Observable1: 
     { 
      Interlocked.Exchange(ref isActive, 1); 

      // Here is some code which causes that observable2 will get some new values. 
      // (this coud be also on an other thread) 
      // If this is the case, the new value(s) should be ignored. 

      Interlocked.Exchange(ref isActive, 0); 
     } 
     break; 

     case Source.Observable2: 
     { 
      Interlocked.Exchange(ref isActive, 1); 

      // Here is some code which causes that observable1 will get some new values. 
      // (this coud be also on an other thread) 
      // If this is the case, the new value(s) should be ignored. 

      Interlocked.Exchange(ref isActive, 0); 
     } 
     break; 
    } 
}); 
Смежные вопросы