2017-01-07 2 views
1

Имея некоторые проблемы в достижении того, что я хочу с RxJS5 - У меня есть простая Наблюдаемая цепь, начались с Rx.Observable.interval:Подождите, прикованные наблюдаемые для завершения

const Rx = require('rxjs'); 

var i = 0; 

const obs = Rx.Observable.interval(100) 
    .flatMap(function() { 
     return Rx.Observable.timer(Math.ceil(500*Math.random())) 
      .map(function(val){ 
       console.log(' => These should all log first => ', val); 
       return i++; 
      }); 
    }) 
    .take(5) 
    .merge() // this doesn't seem to do what I want to do 
    .map(function (val) { 
     console.log('all done = > ', val); 
    }); 

obs.subscribe(); 

Вышеуказанных журналы это:

=> These should all log first => 0 
all done = > 0 
=> These should all log first => 0 
all done = > 1 
=> These should all log first => 0 
all done = > 2 
=> These should all log first => 0 
all done = > 3 
=> These should all log first => 0 
all done = > 4 

Ищу войти следующее:

=> These should all log first => 0 
=> These should all log first => 0 
=> These should all log first => 0 
=> These should all log first => 0 
=> These should all log first => 0 

all done = > [0,1,2,3,4] 

это ясно, что мы не ждем всех тим er наблюдаемых до конца, так как вы увидите «все сделано!». многократно заносится в журнал, вкрапленное «Все должны регистрироваться в первую очередь».

Как я могу получить вывод, который я ищу?

Для этого мы можем использовать zip, но API для zip не подходит для этого прецедента, потому что у нас нет всех наблюдателей таймера в одном месте одновременно!

Если мой вопрос не был достаточно ясен, здесь является аналогом того, что я хочу сделать, мы блокируем все обратные вызовы, пока мы произвольно не закончим, и мы собрали все результаты:

const async = require('async'); 
var i = 0; 

async.forever(function(cb){ 

    process.nextTick(function(){ 
     console.log('These should all log first'); 
     const err = i++ === 5; 
     cb(err, i); 
    }); 

}, function done(err, results){ 
    // let's pretend results contains all the i values 
    console.log('all done'); 
}); 
+0

отмечает, что если подставить takeLast() для слияния(), мы зарегистрируем «все сделано» после того, как все «Они должен все журнал первый», который является «правильным», но «все сделано» регистрируются 50x! Я хочу, чтобы «все сделано» было зарегистрировано только один раз :) –

+0

'take' не доставляет их до конца, он просто заканчивает поток, когда его достаточно видно. И «слияние» в одном потоке ничего не делает. Посмотрите, например. http://rxmarbles.com/#merge, который дает хорошие иллюстрации о том, что происходит. – jonrsharpe

+0

Хм, разве вы не ожидали, что «все сделано» будет регистрироваться столько раз, сколько есть элементов, поскольку вы их сопоставляете?Основываясь на этом коде, я бы на самом деле ожидал увидеть, что журнал событий в консоли карты столько раз, сколько вы «берете». Я думаю, что ваше использование слияния, вероятно, не делает то, что вы ожидаете, потому что вы не кормите сливаете то, что считаете себя (оно не просто сидит и ждет все ваши «взятые» объекты, а затем объединяет их сразу). –

ответ

0

Обратите внимание, что это в основном дает лесозаготовительной порядок, что я ожидал увидеть, но я не думаю, что это правильный/лучший способ сделать это:

const {Observable} = require('rxjs'); 

    const obs = Observable.interval(100) 
     .flatMap(function() { 
      return Observable.timer(Math.ceil(500*Math.random())) 
       .map(function(val){ 
        console.log(' => These should all log first => ', val); 
       }); 
     }) 
     .take(5) 
     .takeLast() // <<<<<<<<<< 
     .map(function() { 
      console.log('all done'); 
     }) 
     .take(1) // <<<<<<<<<< 

    obs.subscribe(); 

Я думаю, что есть еще лучший способ для достижения этой цели.

+2

Вы можете заменить все между 'take (50)' и 'subscribe' на' last (function() {console.log ('all done');}) 'или просто переместить этот журнал в полный обратный вызов по подписке , – jonrsharpe

+0

Замените 'flatMap' на' concatMap' (если это не имеет значения, в каком порядке ваши рандомизированные таймеры завершают и возвращают значение), замените 'takeLast' на' toArray' и удалите 'take (1)' –

0

@jonsharpe дал мне этот ответ, который в основном работает. Проблема действительно может быть устранена до Rx.Observable.interval, мы можем избавиться от отображения Rx.Observable.timer. Вот наш основной ответ:

const Rx = require('rxjs'); 

const obs = Rx.Observable.interval(100) 
    .take(5) 
    .map(function(v){ 
     console.log(v); 
     return v; 
    }) 
    .reduce(function (prev, curr) { 
     return prev.concat(curr); 
    },[]) 
    .last(function (results) { 
     return results; 
    }) 
    .map(function(v){ 
     console.log(v); 
    }); 


obs.subscribe(); 

Я был бы очень интересно, однако, таким образом, чтобы сделать это без уменьшить.

+0

'reduce' необходимо для сбора всех значений в массив. Хотя я не понимаю, почему вам нужно 'last', сокращение приводит к одному результату. Также семантически лучше вызывать 'all done' в' subscribe' вместо 'map' с пустым' subscribe'. –

+0

@jonrsharpe вы можете быть правы, но это противоречит семантике сокращения, не так ли :) –

+0

Даже если это не то, чего хочет OP, и, как вы говорите, интересна семантика «уменьшения» и «последней» , это интересный фрагмент кода. Я захватил его, чтобы помочь моему развивающемуся пониманию Rx. –

0

Так требование:

  • Пусть верхнего уровня интервала прогонов N раз.
  • Каждый интервал должен возвращать массив наблюдаемых таймеров Y.
  • Далее следует проследить N раз, каждый раз с массивом Y наблюдаемых.

Это делает это. Это немного наивно, но это действительно работает.

let timerArrLength = [ 4, 2, 3 ]; 
    let svc = Rx.Observable.interval(1000) 
     .take(timerArrLength.length) 
     .map(function (index) { 
      let arr = []; 
      for (let i = 0; i < timerArrLength [ index ]; i++) { 
       arr.push (Rx.Observable.timer (1000)); 
      } 
      return arr; 
     }); 


     svc.subscribe(
      function onNext(v){ 
       console.log('=> v =>',v); 
      }, 
      function onError(e){ 
       console.error(e); 
      }, 
      function onComplete(){ 
       console.log('complete'); 
      } 
     ); 
+0

позвольте мне проверить это :) –

+0

Примечание. Я удалил первый let var, вам он не нужен, просто используйте длину массива. –

+0

эй Тим, то, что я выхожу из системы, представляет собой кучу наблюдаемых, посмотрите, можете ли вы получить все значения, которые будут развернуты в одном конечном обратном вызове. Для целей дистилляции просто избавитесь от таймера и просто используйте интервал. –

Смежные вопросы