2016-01-05 9 views
0

В следующем фрагменте кода наблюдаемый должен срабатывать каждые 300 миллисекунд. Я приправил его, имитируя фоновой активности, которая занимает 1 секунду. Я ожидал, что, поскольку я использую планировщик, который использует пул потоков под ним, промежуточный наблюдаемый будет продолжать стрелять по новому потоку каждые 300 миллисекунд. Вместо этого происходит то, что наблюдаемый интервал ждет целую секунду каждый раз, прежде чем снова отстреливаться. Это желаемое поведение? Как можно заставить его срабатывать параллельно, если задача занимает больше требуемого времени?Наблюдаемые по интервалу RxJava наблюдаемые занимают больше времени, чем указано

Вот код:

Observable 
      .interval(300, TimeUnit.MILLISECONDS, Schedulers.io()) 
       .doOnNext(new Action1<Long>() { 
        @Override 
        public void call(Long aLong) { 
         System.out.println("action thread: " + Thread.currentThread()); 
         try { 
          Thread.sleep(1000); 
         } catch (InterruptedException e) { 
          e.printStackTrace(); 
         } 
        } 
       }) 
       .map(new Func1<Long, Float>() { 
        @Override 
        public Float call(Long aLong) { 
         final double result = Math.random(); 
         return new Float(result); 
        } 
       }) 
       .takeWhile(new Func1<Float, Boolean>() { 
        @Override 
        public Boolean call(Float aFloat) { 
         return aFloat >= 0.01f; 
        } 
       }) 
      .subscribe(new Action1<Float>() { 
       @Override 
       public void call(Float aFloat) { 
        System.out.println("observing thread: " + Thread.currentThread()); 
        System.out.println(aFloat); 
       } 
      }); 

ответ

2

Наблюдаемые последовательны в природе, так что если вы поставите во сне, например, как в примере, вы блокируете всю последовательность. Чтобы выполнить фоновое вычисление, вам нужно переместить его в другой поток через observeOn или subscribeOn. В этом случае, вы можете flatMap/concatMapEager в другой наблюдаемой, что делает сон и объединить результаты обратно в основной последовательности:

Observable.interval(300, TimeUnit.MILLISECONDS) 
.flatMap(t -> Observable.fromCallable(() -> { 
     Thread.sleep(1000); 
    }).subscribeOn(Schedulers.io())) 
.map(...) 
.takeWhile(...) 
.subscribe(...) 
+0

Спасибо, это решить мою проблему. Я смотрел глубже в код, который выполняет планирование, и здесь четко указано, что он будет блокировать выполняемые операции, поэтому они должны выполняться в другом месте. Мне потребовалось немного времени, чтобы обернуть голову вокруг вашего конкретного использования flatMap, но, похоже, это имеет общий смысл. flatMap - метод удобства, который объединяет все отдельные потоки в один объединенный поток Observable. Теперь все имеет смысл. Спасибо! – preslavrachev

Смежные вопросы