2017-02-16 9 views
0

Я имею некоторую наблюдаемую последовательность, например:Выполните действия после всех наблюдателей полностью на наблюдаемых последовательности

var period = TimeSpan.FromSeconds(0.5); 
var observable = Observable 
    .Interval(period) 
    .Publish() 
    .RefCount(); 

Я хочу, чтобы выполнить некоторые трудные вычисления для элементов этой последовательности в фоновом потоке, а также выполнить некоторое окончательное решение когда все вычисления выполнены. Поэтому я хочу что-то вроде этого:

observable.ObserveOn(Scheduler.Default).Subscribe(i => ComplexComputation1(i)); 
observable.ObserveOn(Scheduler.Default).Subscribe(i => ComplexComputation2(i)); 
// next observer must be called only after ComplexComputation1/2 complete on input i 
observable.Subscribe(i => FinalAction(i)); 

Могу ли я сделать это в Rx? Или, может быть, это нарушает некоторые принципы реактивного программирования, и я должен использовать другой подход в такой ситуации?

ответ

2

Очень опасно иметь упорядоченные по порядку последовательности в реактивных шаблонах.

Одна вещь, которую вы можете сделать, состоит в том, чтобы каждый комплексный расчет излучал событие после его завершения. Затем у вас может быть потребитель-потребитель, который выполнит свой расчет, как только он получит сообщения о завершении предыдущих шагов.


Другим возможным решением является создание конкретного блока последовательности, который запускается регулярно. Это уменьшает параллелизуемость решения.

observable.ObserveOn(Scheduler.Default).Subscribe(i => 
{  
    ComplexComputation1(i)); 
    ComplexComputation2(i)); 
    FinalAction(i); 
} 
+0

Я думал об этом подходе. Но на самом деле, я не знаю нескольких наблюдателей, которые обрабатывают каждый элемент. Может ли это решение применяться к такой ситуации? –

+0

В качестве возможного решения я добавил подход блока последовательностей. –

1

Чтобы проверить это, я создал следующие методы, чтобы помочь проиллюстрировать последовательность событий:

public void ComplexComputation1(long i) 
{ 
    Console.WriteLine("Begin ComplexComputation1"); 
    Thread.Sleep(100); 
    Console.WriteLine("End ComplexComputation1"); 
} 

public void ComplexComputation2(long i) 
{ 
    Console.WriteLine("Begin ComplexComputation2"); 
    Thread.Sleep(100); 
    Console.WriteLine("End ComplexComputation2"); 
} 

public void FinalAction(long i) 
{ 
    Console.WriteLine("Begin FinalAction"); 
    Thread.Sleep(100); 
    Console.WriteLine("End FinalAction"); 
} 

Ваш оригинальный код выбежала так:

 
Begin FinalAction 
Begin ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
End FinalAction 
End ComplexComputation1 
Begin FinalAction 
Begin ComplexComputation1 
Begin ComplexComputation2 
End FinalAction 
End ComplexComputation2 
End ComplexComputation1 
Begin FinalAction 
Begin ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
End ComplexComputation1 
End FinalAction 
... 

Это легко провести в жизнь код для запуска последовательно в одном фоновом потоке. Просто используйте EventLoopScheduler.

var els = new EventLoopScheduler(); 

observable.ObserveOn(els).Subscribe(i => ComplexComputation1(i)); 
observable.ObserveOn(els).Subscribe(i => ComplexComputation2(i)); 
// next observer must be called only after ComplexComputation1/2 complete on input i 
observable.ObserveOn(els).Subscribe(i => FinalAction(i)); 

Это дает:

 
Begin ComplexComputation1 
End ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
Begin FinalAction 
End FinalAction 
Begin ComplexComputation1 
End ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
Begin FinalAction 
End FinalAction 
Begin ComplexComputation1 
End ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
Begin FinalAction 
End FinalAction 

Но как только вы приведете Scheduler.Default это не работает.

Чем больше или менее простой вариант, чтобы сделать это:

var cc1s = observable.ObserveOn(Scheduler.Default).Select(i => { ComplexComputation1(i); return Unit.Default; }); 
var cc2s = observable.ObserveOn(Scheduler.Default).Select(i => { ComplexComputation2(i); return Unit.Default; }); 

observable.Zip(cc1s.Zip(cc2s, (cc1, cc2) => Unit.Default), (i, cc) => i).Subscribe(i => FinalAction(i)); 

Это работает, как ожидалось.

Вы получаете хорошую последовательность, как это:

 
Begin ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation1 
End ComplexComputation2 
Begin FinalAction 
End FinalAction 
Begin ComplexComputation2 
Begin ComplexComputation1 
End ComplexComputation2 
End ComplexComputation1 
Begin FinalAction 
End FinalAction 
Begin ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
End ComplexComputation1 
Begin FinalAction 
End FinalAction 
0

Это кажется простым случаем композиции вложенного наблюдаемое раздавливания (SelectMany/Слияние/Concat) и Zip

Здесь я принял свобода предположить, что методы Long Running возвращают Task. Однако, если они этого не делают, тогда медленный синхронный метод блокировки может быть обернут Observable.Start(()=>ComplexComputation1(x)).

void Main() 
{ 
    var period = TimeSpan.FromSeconds(0.5); 
    var observable = Observable 
     .Interval(period) 
     .Publish() 
     .RefCount(); 

    var a = observable.Select(i => ComplexComputation1(i).ToObservable()) 
       .Concat(); 
    var b = observable.Select(i => ComplexComputation2(i).ToObservable()) 
       .Concat(); 

    a.Zip(b, Tuple.Create) 
     .Subscribe(pair => FinalAction(pair.Item1, pair.Item2)); 
} 

// Define other methods and classes here 
Random rnd = new Random(); 
private async Task<long> ComplexComputation1(long i) 
{ 
    await Task.Delay(rnd.Next(50, 1000)); 
    return i; 
} 
private async Task<long> ComplexComputation2(long i) 
{ 
    await Task.Delay(rnd.Next(50, 1000)); 
    return i; 
} 

private void FinalAction(long a, long b) 
{ 

}