2016-02-12 6 views
1

У меня есть 2 службы в моем приложении. Служба youTube, которая загружает потоки комментариев в YouTube и комментарии по проводке и службу комментариев, которая управляет загрузкой партий комментариев. Служба комментариев относится только к моему приложению, а служба youTube - агностическая.Как использовать один поток RXJs для 2 разных вещей

У меня есть функция getCommentThreadsForChannel, которая загружает потоки комментариев. Основная реализация для этого - в службе youTube, но это вызвано службой комментариев, которая в основном просто вызывает возврат наблюдаемого из службы youTube.

Что касается моего контроллера, который вызывает это, это всего лишь наблюдаемая последовательность комментариев. Тем не менее, в моем commentService я хочу хранить эти потоки локально. Я хочу, чтобы пакет для хранения всех потоков всякий раз, когда я получаю еще 100 потоков, так что я не обрабатываю список для каждого нового бита данных. Я пришел с этим кодом:

getCommentThreadsForChannel(): Rx.Observable<ICommentThread> { 
     var threadStream: Rx.Observable<ICommentThread> = 
      this.youTubeService.getCommentThreadsForChannel(); 

     threadStream 
      .bufferWithCount(100) 
      .scan((allItems, currentItem) => { 
       currentItem.forEach(thread => { 
        allItems.push(thread); 
       }); 

       console.log(`Save items to local storage: ${allItems.length}`) 

       return allItems; 
      }, [] ); 

     return threadStream; 
    } 

Я думаю, что логика здесь дозирование нити и аккумулировать все потоки в один массив в порядке, но этот код не вызывается. Я предполагаю, что это потому, что я вообще не подписываюсь на эту тему.

Я не хочу подписываться здесь, поскольку это будет подписываться на базовый поток, и тогда у меня будет 2 подписки, и все данные будут загружаться дважды (есть много данных - для загрузки требуется около минуты все и более 30 вызовов из 100 потоков за раз).

Я в основном хочу здесь do, который не повлияет на поток, который передается контроллеру, но я хочу использовать логику буферизации и накопления RXjs.

Я предполагаю, что мне нужно поделиться или опубликовать поток в некотором роде, но я не успеваю использовать эти операторы раньше и не могу видеть, как я мог бы это сделать, не добавляя вторую подписку.

Как поделиться одним потоком и использовать его двумя разными способами без подписки дважды? Могу ли я создать какой-то пассивный поток, который подписывается только тогда, когда наблюдаемый он основан на подписках?

ответ

1

Я понял это в конце (с некоторой помощью от @MonkeyMagiic).

Мне пришлось делиться потоком, чтобы я мог делать 2 разных вещи с данными, буферизировать их и на каждое значение отдельно. Проблема заключалась в том, что оба эти потока должны были быть подписаны, но я не хотел подписываться на услуге - это должно быть сделано в контроллере.

Решение было объединить 2 потоков снова и игнорировать значения из буфера:

var intervalStream = Rx.Observable.interval(250) 
    .take(8) 
    .do(function(value){console.log("source: " + value);}) 
    .shareReplay(1); 

var bufferStream = intervalStream.bufferWithCount(3) 
    .do(function(values){ 
     console.log("do something with buffered values: " + values); 
    }) 
    .flatMap(function(values){ return Rx.Observable.empty(); }); 

var mergeStream = intervalStream.merge(bufferStream); 

mergeStream.subscribe(
    function(value){ console.log("value received by controller: " + value); }, 
    function(error){ console.log("error: " + error); }, 
    function(){ console.log("onComplete"); } 
); 

Выход:

"source: 0" 
"value received by controller: 0" 
"source: 1" 
"value received by controller: 1" 
"source: 2" 
"value received by controller: 2" 
"do something with buffered values: 0,1,2" 
"source: 3" 
"value received by controller: 3" 
"source: 4" 
"value received by controller: 4" 
"source: 5" 
"value received by controller: 5" 
"do something with buffered values: 3,4,5" 
"source: 6" 
"value received by controller: 6" 
"source: 7" 
"value received by controller: 7" 
"do something with buffered values: 6,7" 
"onComplete" 

JSBin

0

Я считаю, что вы ищете комбинацию из share и replay, к счастью, у RX есть shareReplay(bufferSize).

var threadStream: Rx.Observable<ICommentThread> = this.youTubeService 
       .getCommentThreadsForChannel() 
       .shareReplay(1); 

getCommentThreadsForChannel(): Rx.Observable<ICommentThread> { 
     threadStream 
      .bufferWithCount(100) 
      .scan((allItems, currentItem) => { 
       currentItem.forEach(thread => { 
        allItems.push(thread); 
       }); 

       console.log(`Save items to local storage: ${allItems.length}`) 

       return allItems; 
      }, [] ); 

     return threadStream; 
    } 

Использования групповой доли оператора будет гарантировать, что каждый дополнительный абонемент после первого не делает ненужный запрос и повторы, чтобы гарантировать, что все будущие подписки получить последнее уведомление.

+0

ха, только заметил, что мы оба работают по той же Компания! – MonkeyMagiic

+0

Стоит отметить, что 'shareReplay', поскольку он стоит, был удален из последней версии RxJS 5. [Подробнее см. Здесь] (http://stackoverflow.com/questions/35246873/sharereplay-in-rxjs-5). –

+0

Спасибо за ответ, но он не работает. Я боюсь. Я думаю, что проблема в том, что поток потока подписывается, когда он возвращается из этой функции, но bufferWithCount и поток сканирования не подписываются. Как я уже говорил, я не хочу подписываться здесь, я только хочу, чтобы это работало, когда поток потоков подписан. – Roaders

Смежные вопросы