2016-06-06 2 views
6

Я учусь о операторе RxJava, и я нашел их ниже код ничего не печатает:Как использовать RxJava Interval Оператор

public static void main(String[] args) { 

    Observable 
    .interval(1, TimeUnit.SECONDS) 
    .subscribe(new Subscriber<Long>() { 
     @Override 
     public void onCompleted() { 
      System.out.println("onCompleted"); 
     } 

     @Override 
     public void onError(Throwable e) { 
      System.out.println("onError -> " + e.getMessage()); 
     } 

     @Override 
     public void onNext(Long l) { 
      System.out.println("onNext -> " + l); 
     } 
    }); 
} 

Как ReactiveX, interval

создать Observable, который излучает последовательность целых чисел, отстоящих на определенный промежуток времени

Я допустил ошибку или забыл про что-то?

ответ

7

Вы должны блокироваться, пока наблюдаемое не израсходуется:

public static void main(String[] args) throws Exception { 

    CountDownLatch latch = new CountDownLatch(1); 

    Observable 
    .interval(1, TimeUnit.SECONDS) 
    .subscribe(new Subscriber<Long>() { 
     @Override 
     public void onCompleted() { 
      System.out.println("onCompleted"); 
      // make sure to complete only when observable is done 
      latch.countDown(); 
     } 

     @Override 
     public void onError(Throwable e) { 
      System.out.println("onError -> " + e.getMessage()); 
     } 

     @Override 
     public void onNext(Long l) { 
      System.out.println("onNext -> " + l); 
     } 
    }); 

    // wait for observable to complete (never in this case...) 
    latch.await(); 
} 

Вы можете добавить .take(10), например, чтобы увидеть наблюдаемую полным.

+0

Ницца, чистый пример CountdownLatch для загрузки. – mtyson

+0

в Rxjava 2, есть ли способ использовать интервал с одиночным? –

2

Положите Thread.sleep(1000000) после подписания и вы увидите, что это работает. Observable.interval работает по умолчанию на Schedulers.computation(), поэтому ваш поток запускается в потоке, отличном от основного потока.

+0

Да, это правильно. Интервальный оператор работает по асинхронному способу, поэтому мне нужно блокировать ожидание abd, чтобы получить результат от него. Благодаря! –

0

Как они уже говорят, интервал работает асинхронно, поэтому вам нужно дождаться завершения всех событий.

Вы можете получить подписку после подписания, а затем использовать TestSubcriber, который является частью платформы reactiveX, и который даст вам возможность ждать завершения всех событий.

 @Test 
public void testObservableInterval() throws InterruptedException { 
    Subscription subscription = Observable.interval(1, TimeUnit.SECONDS) 
       .map(time-> "item emitted") 
       .subscribe(System.out::print, 
         item -> System.out.print("final:" + item)); 
    new TestSubscriber((Observer) subscription) 
      .awaitTerminalEvent(100, TimeUnit.MILLISECONDS); 
} 

У меня есть в моем GitHub больше примеров, если вам нужно https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

Смежные вопросы