2017-01-29 3 views
3

Ниже приведен краткий фрагмент кода реактивного кода (RxJs)RxJs Наблюдаемое завершает несколько раз

let subj = new Rx.Subject(); 
 
let chain = subj 
 
    .switchMap(v => Rx.Observable.of(10*v).do(vv => console.log("Switch map", vv))) 
 
    .share() 
 
    .take(1); 
 

 

 
function subscribe(){ 
 
    chain.subscribe(v => console.log("Next", v), 
 
        err => console.log("Error",err), 
 
       () => console.log("Completed")); 
 
    chain.subscribe(v => console.log("Next2", v), 
 
        err => console.log("Error2",err), 
 
       () => console.log("Completed2")); 
 
    subj.next(Math.random()); 
 
} 
 

 
subscribe(); 
 
subscribe(); 
 
subscribe();
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

В соответствии с documentationchain является Observable, который должен печатать излучаемый значение * 10 (switchMap), при печати только один раз, независимо от того, сколько у него подписчиков (share), сделайте это только для первого испущенного значения, а затем заполните.

Первые две пули работают нормально, но последний не работает. Вот результат я получаю:

Switch map 9.022491050934722 
Next 9.022491050934722 
Completed 
Next2 9.022491050934722 
Completed2 
Switch map 9.172999425126836 
Next 9.172999425126836 
Completed 
Next2 9.172999425126836 
Completed2 
Switch map 6.168790337405257 
Next 6.168790337405257 
Completed 
Next2 6.168790337405257 
Completed2 

Как вы можете видеть, chain становится завершенные несколько раз.
Что позволяет заполнить одно и то же значение Observable несколько раз?

ответ

4

share ярлык для комбинации publish и refCount, это означает, что поток только «горячий» до тех пор, пока существует, по крайней мере один абонент, так что после того, как поток завершается, все активные абоненты автоматически отписался, что в свою очередь сбрасывает поток, поскольку есть 0 подписчиков. Также: вы должны поставить take(1) перед share, поскольку любая следующая операция влияет на горячее состояние.

Как сделать поток "истинно" общий/горячий, independed от любых абонентов: Использование publish и connect поток:

let subj = new Rx.Subject(); 
 
let chain = subj 
 
    .switchMap(v => Rx.Observable.of(10*v).do(vv => console.log("Switch map", vv))) 
 
    .take(1) 
 
    .publish(); 
 
chain.connect(); 
 

 
function subscribe(){ 
 
    chain.subscribe(v => console.log("Next", v), 
 
        err => console.log("Error",err), 
 
       () => console.log("Completed")); 
 
    chain.subscribe(v => console.log("Next2", v), 
 
        err => console.log("Error2",err), 
 
       () => console.log("Completed2")); 
 
    subj.next(Math.random()); 
 
} 
 

 
subscribe(); 
 
subscribe(); 
 
subscribe();
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

+0

Так вы говорите, что каждый ' Наблюдаемый', который возвращается из 'share', сбрасывается и переоценивается после его завершения? – meltedspark

+0

Не автоматически, но как только появляется новый подписчик. – olsn

+0

Хорошо, это объясняет вещи. Имейте это в документации. Я не получил часть 'take' after' share'. Можете ли вы подробно рассказать? – meltedspark

Смежные вопросы