2016-10-17 4 views
0

Я пытаюсь написать оболочку на основе websocket rxjs.rxjs pausableBuffered несколько подписей

И я борюсь с моим пониманием rxjs.

У меня есть поток паузы, который должен приостанавливать приостанавливаемые буферизованные потоки при возникновении ошибки и возобновлять их, как только я получу «ок» форму websocket.

Как-то только первая подписка на мои приостановленные буферизованные потоки запущена. С этого момента только очередь складывается выше.

Я подготовил jsbin, чтобы воспроизвести проблему.

https://jsbin.com/mafakar/edit?js,console

Там в "сообщение" RECIVED поток только огни для первой подписки. И затем q и observer начинают складываться.

У меня как-то есть ощущение, что это о горячем и холодном восприятии, но я не могу понять проблемы. Буду признателен за любую помощь.

Спасибо заранее!

ответ

1

Это не холодный/горячий вопрос. Что вы делаете в своем onMessage, подписывайтесь, а затем распоряжайтесь. Распоряжение завершает последовательность. OnMessageStream должны быть подписаны только один раз, например, в конструкторе:

this.onmessageStream.subscribe(message => console.log('--- msg --- ', message.data)); 

подписаться блок, в том числе Dispose должны быть удалены.

Также обратите внимание, что вы использовали replaySubject без счета, это означает, что очередь содержит все предыдущие значения. Если это не будет желательным поведением, рассмотрите его изменение до .replaySubject(1)

working jsbin.

1

Поскольку @Meir указал dispose в блоке подписки, это не так, поскольку его поведение является недетерминированным. В общем, я бы избегал использования Subjects и вместо этого полагался на заводские методы. Вы можете увидеть рефакторинга версию здесь: https://jsbin.com/popixoqafe/1/edit?js,console

Быстрая разбивка изменений:

class WebSocketWrapper { 
    // Inject the pauser from the external state 
    constructor(pauser) { 

    // Input you need to keep as a subject 
    this.input$ = new Rx.Subject(); 

    // Create the socket 
    this._socket = this._connect();  

    // Create a stream for the open event 
    this.open$ = Rx.Observable.fromEvent(this._socket, 'open'); 

    // This concats the external pauser with the 
    // open event. The result is a pauser that won't unpause until 
    // the socket is open. 
    this.pauser$ = Rx.Observable.concat(
     this.open$.take(1).map(true) 
     pauser || Rx.Observable.empty() 
    ) 
     .startWith(false); 

    // subscribe and buffer the input 
    this.input$ 
     .pausableBuffered(this.pauser$) 
     .subscribe(msg => this._socket.send(msg)); 

    // Create a stream around the message event 
    this.message$ = Rx.Observable.fromEvent(this._socket, 'message') 

     // Buffer the messages 
     .pausableBuffered(this.pauser$) 

     // Create a shared version of the stream and always replay the last 
     // value to new subscribers 
     .shareReplay(1); 
    } 

    send(request) { 
    // Push to input 
    this.input$.onNext(request); 
    } 

    _connect() { 
    return new WebSocket('wss://echo.websocket.org'); 
    } 
} 

Как и в сторону, вы должны также не полагаться на внутренние переменные, как source, которые не предназначены для внешнего потребления. Хотя RxJS 4 относительно стабилен, поскольку они не предназначены для общественного потребления, то может быть изменен из-под вас.

+0

Я изменил свой код после вашего совета! (Спасибо большое) Теперь у меня есть еще одна проблема: после изменения наблюдаемых на «fromEvent» я не знаю, как «заменить» потоки (сохранять текущие подписчики), когда я отключусь и снова подключись с моего сервера. Прежде чем я смог «восстановить» свою частную функцию и вызвать onNext, чтобы испустить события. –

Смежные вопросы