У меня есть соединение с websocket, которое генерирует внутренние сообщения с помощью ReplaySubect. Я обрабатываю эти события и добавляю задержку к определенным сообщениям. Внутри я использую publish(). RefCount() дважды, один раз на внутреннем ReplaySubject и снова в опубликованном потоке вывода.Каков правильный способ управления потоками RxJS и опубликовать наблюдение за результатами?
Должен ли внутренний объект использовать как «публикацию», так и «refCount»? Я использую «публикацию», потому что у меня есть несколько подписчиков, но я не совсем уверен, когда использовать «refCount».
Можно ли просто избавиться от внутреннего предмета? Будет ли это очищать все остальное?
Кто поддерживает «eventStream» должен получить самую последнюю версию, но соединение не следует ждать каких-либо абонентов
Пример кода:
function Connection(...) {
var messageSubject = new Rx.ReplaySubject(1);
var messageStream = messageSubject.publish().refCount();
// please ignore that we're not using rxdom's websocket.
var ws = new WebSocket(...);
ws.onmessage = function(messageEvent) {
var message = JSON.parse(messageEvent.data);
messageSubject.onNext(message);
}
ws.onclose = function(closeEvent) {
messageSubject.dispose(); // is this all I need to dispose?
}
var immediateRevisions = messageStream
.filter((e) => e[0] === "immediate")
.map((e) => ["revision", e[1]]);
var delayedRevisions = messageStream
.filter((e) => e[0] === "delayed")
.map((e) => ["revision", e[1]]).delay(1000);
var eventStream = Rx.Observable.merge(immediateRevisions, delayedRevisions).publish().refCount();
Object.defineProperties(this, {
"eventStream": { get: function() { return eventStream; }},
});
}
// using the eventStream
var cxn = new Connection(...)
cxn.eventStream.subscribe((e) => {
if (e[0] === "revision") {
// ...
}
});
Это хороший ответ, но я рекомендую расширения на том, что «shareReplay» делает (делал?) И, возможно, на распоряжаться. Мне в основном нужно немного руководства по подключению и совместному использованию потоков. Я просмотрю ваши ссылки! Благодаря! –