2016-01-18 4 views
2

Обнаружены вполне естественно, если они построены из конечных данных.Как onComplete действительно работает в RxJS

import {Observable, Subject} from "rx"; 

let stream0$ = Observable.of("1", "2", "3"); 
let stream1$ = stream0$.map(x => x); 

stream1$.subscribe(
    (val) => { console.log("onNext", val) }, 
    (err) => { console.log("onError", err) }, 
() => { console.log("onCompleted") } 
); 

// onNext 1 
// onNext 2 
// onNext 3 
// onCompleted 

Или нет, если нет. Но как насчет наблюдаемых подписчиков на темы? Например:

import {Observable, Subject} from "rx"; 

let subj$ = new Subject(); 
let stream1$ = subj$.map(x => x); 

stream1$.subscribe(
    (val) => { console.log("onNext", val) }, 
    (err) => { console.log("onError", err) }, 
() => { console.log("onCompleted") } 
); 

subj$.onNext("foo"); 

// onNext foo 

"OnCompleted" не зарегистрирован, хотя источник закончился. Можем ли мы передать это событие «конец» на stream1$. Я не нашел никакой информации об этом важном материале в документах. Было бы здорово увидеть такую ​​диаграмму, как здесь, Hot and Cold observables : are there 'hot' and 'cold' operators?, чтобы прибить этот поток событий.

ответ

4

С темой вы полностью контролируете ситуацию. Rx.Subject реализует интерфейс наблюдателя, и это означает, что вы используете observer interface, когда вы вызываете onNext.

Субъект

предположить, что все сериализации и грамматическая правильность обрабатываются абонентом субъекта.

Это означает, среди прочего, что до вас дошло сообщение об ошибке и ошибке. Для завершения сигнала используйте onCompleted. FYI, здесь вышеупомянутый грамматика:

Эта грамматика позволяет наблюдаемые последовательности, чтобы отправить любое количество (0 или более) onNext сообщений на подписанный экземпляр наблюдателя, необязательно, с последующим однократным успехом (onCompleted) или недостаточности (onError) сообщение.

Единственное сообщение, указывающее, что наблюдаемая последовательность закончила гарантирует, что потребители наблюдаемой последовательности могут детерминировано установить, что она безопасна для выполнения операций по очистке .

Одиночный отказ дополнительно гарантирует, что семантика прерывания может быть для операторов, работающих с несколькими наблюдаемыми последовательностями.

ПРИМЕЧАНИЕ. Для RxJS v5 интерфейс наблюдателя изменился, ср. new interface

+1

Да, но почему "OnCompleted" не передается вниз по течению, как "onNext" делает? Вероятно, вы неправильно поняли мой вопрос. –

+1

? Если вы выполняете 'subj $ .onCompleted();' it should. Это не? Ср http://jsfiddle.net/zrLn7b5t/1/ – user3743222

+1

Да, я был неправ. Все работает так, как ожидалось. –

3

How Complete and Error events actually work in RxJS следующее исследование, которое я сделал.

Цитата себя оттуда.

  1. Завершение не следует воспринимать как «событие в конце программы» или что-то. Это очень специфическая вещь. Существует всего три возможных способа завершения потока: 1) Быть конечным Observable и полностью естественным образом 2) Быть Subject и получить неотложный вызов onCompleted(). 3) Получить событие завершения из восходящего потока. Любая форма завершения процесса/отмены подписки не заполняет потоки.

  2. Завершение прекращения потока. По завершении ничего не происходит в потоке.

  3. Завершение передается вниз по течению. Observable, полученный от Subject, завершается, если/когда Subject завершает работу. Subject подписка на Observable завершается, если/когда Observable завершает работу.

Смежные вопросы