2013-04-18 2 views
1

Я играл с linq/rx и хотел сделать что-то простое, создав поток, который прослушивает два разных потока чисел, и когда либо числовой поток получает новое значение, выводят суммы.Объединение двух потоков чисел с использованием linq или rx

Я попытался ниже:

static void Main(string[] args) 
{ 
    var numSrcA = new Subject<int>(); 
    var numSrcB = new Subject<int>(); 

    var resultsLinq = from a in numSrcA 
         from b in numSrcB 
         select a + b; 

    var resultsRx = numSrcA.SelectMany(a => numSrcB, (a, b) => a + b); 

    resultsLinq.Subscribe(r => Console.WriteLine("Linq: " + r)); 
    resultsRx.Subscribe(r => Console.WriteLine("Rx: " + r)); 

    numSrcA.OnNext(1); 
    numSrcB.OnNext(2); 
    numSrcA.OnNext(3); 

    Console.ReadLine(); 
} 

Выход:

Linq: 3 
Rx: 3 

я ожидал:

Linq: 3 
Rx: 3 
Linq: 5 
Rx: 5 

Хотя, кажется, что новая сумма будет рассчитываться только тогда, когда numSrcB получает новое значение (в этом случае оно суммирует это новое значение против всех существующих значений в numSrcA).

Может ли это быть изменено так, чтобы дополнение к numSrc пересчитало суммы?

ответ

1

Короткий ответ, вы хотите комбинатор, который не требует нового входа на всех потоков, чтобы вызвать оценку результата - другими словами, вы хотите CombineLatest:

var resultsLinq2 = Observable.CombineLatest(numSrcA, numSrcB, (a,b) => a+b);  
resultsLinq2.Subscribe(r => Console.WriteLine("Linq2: " + r)); 

Результаты в:

Linq2: 3 
Linq: 3 
Rx: 3 
Linq2: 5 
Смежные вопросы