2016-02-25 1 views
1

У меня есть соединение с websocket, которое генерирует внутренние сообщения с помощью ReplaySubect. Я обрабатываю эти события и добавляю задержку к определенным сообщениям. Внутри я использую publish(). RefCount() дважды, один раз на внутреннем ReplaySubject и снова в опубликованном потоке вывода.Каков правильный способ управления потоками RxJS и опубликовать наблюдение за результатами?

Должен ли внутренний объект использовать как «публикацию», так и «refCount»? Я использую «публикацию», потому что у меня есть несколько подписчиков, но я не совсем уверен, когда использовать «refCount».

Можно ли просто избавиться от внутреннего предмета? Будет ли это очищать все остальное?

Кто поддерживает «eventStream» должен получить самую последнюю версию, но соединение не следует ждать каких-либо абонентов

Пример кода:

function Connection(...) { 

    var messageSubject = new Rx.ReplaySubject(1); 
    var messageStream = messageSubject.publish().refCount(); 


    // please ignore that we're not using rxdom's websocket. 
    var ws = new WebSocket(...); 
    ws.onmessage = function(messageEvent) { 
     var message = JSON.parse(messageEvent.data); 
     messageSubject.onNext(message); 
    } 
    ws.onclose = function(closeEvent) { 
     messageSubject.dispose(); // is this all I need to dispose? 
    } 


    var immediateRevisions = messageStream 
     .filter((e) => e[0] === "immediate") 
     .map((e) => ["revision", e[1]]); 
    var delayedRevisions = messageStream 
     .filter((e) => e[0] === "delayed") 
     .map((e) => ["revision", e[1]]).delay(1000); 
    var eventStream = Rx.Observable.merge(immediateRevisions, delayedRevisions).publish().refCount(); 

    Object.defineProperties(this, { 
     "eventStream": { get: function() { return eventStream; }}, 
    }); 

} 


// using the eventStream 
var cxn = new Connection(...) 
cxn.eventStream.subscribe((e) => { 
    if (e[0] === "revision") { 
     // ... 
    } 
}); 

ответ

2

публиковать и refCounting в основном то, что делает shareReplay в RxJS4. Честно говоря, вы должны просто позволить вашему наблюдаемому быть «теплым», а затем использовать ReplaySubject в качестве подписчика, если вы действительно хотите гарантировать, что последнее сообщение будет перенаправлено новым подписчикам, даже если количество подписчиков упадет ниже одного. например:

const wsStream = Observable.create(observer => { 
    ws.onmessage = message => observer.next(message); 
    ws.onclose =() => observer.complete(); 
}); 

const latestWsMessages = new ReplaySubject(1); 
wsStream.subscribe(latestWsMessages); 

Обязательно ознакомьтесь как Наблюдаемая работа: после создания наблюдаемого, как правило, каждый абонент позвонит подписке (холодную), но в этом случае, вы, вероятно, хотите горячий наблюдаемую, так что у вас есть несколько подписчиков, имеющих подписку. См. Andre's video here и RxJS docs on creating observables для получения дополнительной информации.

Кроме того, полезными, поскольку классы могут быть, выглядит как в этом случае вы просто хотите функцию makeWebsocketObservable(WebsocketConfig): Observable<WebsocketEvent>

+0

Это хороший ответ, но я рекомендую расширения на том, что «shareReplay» делает (делал?) И, возможно, на распоряжаться. Мне в основном нужно немного руководства по подключению и совместному использованию потоков. Я просмотрю ваши ссылки! Благодаря! –