У меня есть 2 службы в моем приложении. Служба youTube, которая загружает потоки комментариев в YouTube и комментарии по проводке и службу комментариев, которая управляет загрузкой партий комментариев. Служба комментариев относится только к моему приложению, а служба youTube - агностическая.Как использовать один поток RXJs для 2 разных вещей
У меня есть функция getCommentThreadsForChannel
, которая загружает потоки комментариев. Основная реализация для этого - в службе youTube, но это вызвано службой комментариев, которая в основном просто вызывает возврат наблюдаемого из службы youTube.
Что касается моего контроллера, который вызывает это, это всего лишь наблюдаемая последовательность комментариев. Тем не менее, в моем commentService я хочу хранить эти потоки локально. Я хочу, чтобы пакет для хранения всех потоков всякий раз, когда я получаю еще 100 потоков, так что я не обрабатываю список для каждого нового бита данных. Я пришел с этим кодом:
getCommentThreadsForChannel(): Rx.Observable<ICommentThread> {
var threadStream: Rx.Observable<ICommentThread> =
this.youTubeService.getCommentThreadsForChannel();
threadStream
.bufferWithCount(100)
.scan((allItems, currentItem) => {
currentItem.forEach(thread => {
allItems.push(thread);
});
console.log(`Save items to local storage: ${allItems.length}`)
return allItems;
}, [] );
return threadStream;
}
Я думаю, что логика здесь дозирование нити и аккумулировать все потоки в один массив в порядке, но этот код не вызывается. Я предполагаю, что это потому, что я вообще не подписываюсь на эту тему.
Я не хочу подписываться здесь, поскольку это будет подписываться на базовый поток, и тогда у меня будет 2 подписки, и все данные будут загружаться дважды (есть много данных - для загрузки требуется около минуты все и более 30 вызовов из 100 потоков за раз).
Я в основном хочу здесь do
, который не повлияет на поток, который передается контроллеру, но я хочу использовать логику буферизации и накопления RXjs.
Я предполагаю, что мне нужно поделиться или опубликовать поток в некотором роде, но я не успеваю использовать эти операторы раньше и не могу видеть, как я мог бы это сделать, не добавляя вторую подписку.
Как поделиться одним потоком и использовать его двумя разными способами без подписки дважды? Могу ли я создать какой-то пассивный поток, который подписывается только тогда, когда наблюдаемый он основан на подписках?
ха, только заметил, что мы оба работают по той же Компания! – MonkeyMagiic
Стоит отметить, что 'shareReplay', поскольку он стоит, был удален из последней версии RxJS 5. [Подробнее см. Здесь] (http://stackoverflow.com/questions/35246873/sharereplay-in-rxjs-5). –
Спасибо за ответ, но он не работает. Я боюсь. Я думаю, что проблема в том, что поток потока подписывается, когда он возвращается из этой функции, но bufferWithCount и поток сканирования не подписываются. Как я уже говорил, я не хочу подписываться здесь, я только хочу, чтобы это работало, когда поток потоков подписан. – Roaders