2015-06-09 4 views
2

У меня есть два наблюдаемых, которые созданы из того же источника. Они дифференцируются по карте, которая присваивает рандомизированное значение свойству испускаемого элемента. Вот пример логики:Почему эти наблюдаемые RxJS производят странные выходы?

var Rx = require('rx'); 
var _ = require('lodash'); 

// create a source that emits a single event, and map that to an empty object 
var source = Rx.Observable 
    .range(0, 1) 
    .map(function makeObject() { return {}; }); 

// map the empty object and give each one a type property with the 
// value randomly chosen between "a" or "b" 
var typed = source.map(function type(obj) { 
    obj.type = _.sample(['a', 'b']); // obj.type will randomly be 'a' or 'b' 
    return obj; 
}); 

// create an observable that only contains "a" 
var a = typed.filter(function(obj) { 
    return obj.type === 'a'; 
}); 

// create an observable that only contains "b" 
var b = typed.filter(function(obj) { 
    return obj.type === 'b'; 
}); 

// merge both observables and log the result in the subscription 
Rx.Observable.merge(a, b).subscribe(function(obj) { 
    console.log(obj); 
}); 

Я бы ожидать, что этот последний объединенный поток всегда будет производить один объект с любой obj.type === 'a' или obj.type === 'b', а затем полной.

Однако каждый раз, когда я запускаю этот скрипт, я получаю различные результаты, некоторые ожидаемые, некоторые неожиданные.

Ожидаемый результат "а":

{ type : 'a' } 

Ожидаемый результат "б":

{ type : 'b' } 

Неожиданное как:

{ type : 'a' } 
{ type : 'b' } 

И, иногда я не получаю никакого вывода вообще. Что мне здесь не хватает?

+0

Это больше упражнений в процессе обучения RX? Или вы планируете развертывать этот код? Потому что, по крайней мере, вы заявили, что это серьезный излишний оператор. – paulpdaniels

+0

Это упрощенная версия фактического кода, чтобы проиллюстрировать проблему. Настоящий код является частью более сложной системы. – Stephen

ответ

4

Проблема связана с ленивым характером RX:

У вас есть две подписки, которые получают созданное слияние вызова, каждые подписку на результатах в оценке всех наблюдаемых операторов.

Что означает:

абонементом -> может привести либо:

  • Пункт А генерируется, а затем излучается.
  • пункт Ь генерируется, а затем отфильтровывают

подписку б -> то же самое, либо:

  • Пункт б генерируется, а затем излучается
  • Пункт А генерируется, а затем отфильтровывают

Если вы объедините эти потоки, вы получите любой из этих результатов: только a, только b, и & b, ни один из них.

Подробнее

Давайте посмотрим на более простом примере:

var source = Rx.Observable 
    .range(0, 1) 
    .map(function() { return Math.random(); }) 

В настоящее время в обычных системах паб-суб, можно было бы ожидать, что, если я добавлю 2 подписчиков, каждый выход подписчик то же самое значение:

source.subscribe(function(x){console.log("sub 1:" + x)}) 
source.subscribe(function(x){console.log("sub 2:" + x)}) 

только они не являются, каждый из них будет печатать другое значение, поскольку каждая подписка называет math.random() Aga в.

И хотя это немного странно, на самом деле это правильное поведение наблюдаемых rx, каждая новая подписка вызывает новую оценку наблюдаемого потока.

Merge подписаться на эти два наблюдаемых (что означает, что два значения были созданы вместо одного) и испускать значения для нового наблюдаемого.

Чтобы избежать такого поведения, мы можем использовать операторы публикации RX. Там более подробное объяснение на те здесь:

http://www.introtorx.com/content/v1.0.10621.0/14_HotAndColdObservables.html

Таким образом, в этом случае:

var source = Rx.Observable 
    .range(0, 1) 
    .map(function makeObject() { return {}; }); 

var typed = source.map(function type(obj) { 
    obj.type = _.sample(['a', 'b']); // obj.type will randomly be 'a' or 'b' 
    return obj; 
}).replay().refCount(); 
+0

Спасибо за ответ, но я до сих пор не совсем понимаю, почему результат «только a, только b, и a & b, [или] ни один из них». Кроме того, добавление '.share()' так, как вы указали, не изменило результаты. – Stephen

+0

Итак, я добавил '.share()' * после * случайное сопоставление, и это исправило мою проблему. Но я не понимаю, почему. Не могли бы вы уточнить? – Stephen

+0

Я добавляю более подробную информацию, я также изменил на replay.refCount(), поскольку он больше подходит для получения желаемого результата. (share() является short для publish.refCount(), что приведет к тому, что вторая подписка не получит значение, поскольку уже опубликованный наблюдаемый уже закончился) – Yshayy

Смежные вопросы