2016-01-04 3 views
0

Фон: Я ищу для вызова веб-службы внутри метода вызова OnSubscribe. Существует пользовательский класс Subscriber, который также является подписчиком этого Observable. (Следующий код является приближение же)Метод OnSubscribe.call вызывается дважды

Выпуск: Видя, что OnSubscribe.call метод вызывается дважды.

Соотношение между Observable.create (...) ', subscribe (...) и, наконец, преобразование наблюдаемого toBlocking мне не ясно. Кажется, что строка ниже добавляет поведение и как-то снова вызывает OnSubscribe.call. Вызов toBlocking, который, как я предполагаю, в конечном счете должен быть вызван - непосредственно перед тем, как результат должен быть возвращен веб-сервером (сервлет/контроллер, которые не являются Rx в природе).

observable.toBlocking().forEach(i-> System.out.println(i)); 

Полный код Ниже

public static void main(String args[]){ 
    Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { 
     @Override 
     public void call(Subscriber<? super Integer> observer) { 
      System.out.println("In OnSubscribe Create: "+observer.isUnsubscribed()); 
      try { 
       if (!observer.isUnsubscribed()) { 
        for (int i = 1; i < 5; i++) { 
         observer.onNext(i); 
        } 
        observer.onCompleted(); 
       } 
      } catch (Exception e) { 
       observer.onError(e); 
      } 
     } 
}); 

observable.subscribe(new Subscriber<Integer>() { 
    @Override 
    public void onNext(Integer item) { 
     System.out.println("Next: " + item); 
    } 

    @Override 
    public void onError(Throwable error) { 
     System.err.println("Error: " + error.getMessage()); 
    } 

    @Override 
    public void onCompleted() { 
     System.out.println("Sequence complete."); 
    } 
}); 
    observable.toBlocking().forEach(i-> System.out.println(i)); 
} 

Выход

In OnSubscribe Create: false 
Next: 1 
Next: 2 
Next: 3 
Next: 4 
Sequence complete. 
In OnSubscribe Create: false 
For Each Printing 1 
For Each Printing 2 
For Each Printing 3 
For Each Printing 4 

Отвечая на собственный вопрос: Так я обрабатывал результаты через Foreach ошибалась. Это само по себе является подписчиком и, следовательно, снова вызывает метод вызова. Правильный способ делать то, что я делал, было через CountDownLatch.

Это также разъясняет мое другое сомнение - в контексте наличия точки, в которой мы, наконец, дождаемся завершения работы непосредственно перед возвратом ответчика в контроллере/сервлете, означало бы использовать защелку с таймаутом.

Код ниже.

public static void main(String args[]) throws Exception { 
    CountDownLatch latch = new CountDownLatch(4); 
    Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { 
     @Override 
     public void call(Subscriber<? super Integer> observer) { 
      System.out.println("In OnSubscribe Create: "+observer.isUnsubscribed()); 
      try { 
       if (!observer.isUnsubscribed()) { 
        for (int i = 1; i < 5; i++) { 
         observer.onNext(i); 
         latch.countDown(); 

        } 
        observer.onCompleted(); 
       } 
      } catch (Exception e) { 
       observer.onError(e); 
      } 
     } 
    }); 

    observable.subscribe(new Subscriber<Integer>() { 
     @Override 
     public void onNext(Integer item) { 
      System.out.println("Next: " + item); 
     } 

     @Override 
     public void onError(Throwable error) { 
      System.err.println("Error: " + error.getMessage()); 
     } 

     @Override 
     public void onCompleted() { 
      System.out.println("Sequence complete."); 
     } 
    }); 

    latch.await(); 
    System.out.println(" Wait for Latch Over"); 
} 
+0

Возможно, я ошибаюсь, но вы, как правило, побеждаете цель RxJava: если вы собираетесь использовать ее в блокирующем режиме, вы можете уйти с простой петлей и не нанести накладные расходы RxJava , – AndroidEx

+0

@LordRaydenMK - Это единственный способ ответить на ваш собственный вопрос (или через комментарии), если вы новичок в StackOverflow. –

+0

@AndroidEx - Извините, что я новичок в RX и выяснил это. Но предположим, что у нас есть гигантское монолитное приложение, работающее более десятилетия, я полагаю, мы будем медленно менять его для работы с RX. Кроме того, поскольку все еще не является реактивным в мире веб-приложений (приложения и веб-серверы), мы все равно будем иметь родительский поток, ожидающий завершения деятельности, прежде чем отвечать на запросы. –

ответ

0

toBlocking можно отключить, чтобы сделать подписку с защелкой, следовательно, ваши две подписки.

Но я думаю, что вы были правы в использовании toBlocking().forEach() и должны сохранить эту часть в качестве решения.

Проблема заключается в вашей второй подписке, сделанной по телефону subscribe. Есть ли настоящая потребность в этом subscribe? Если речь идет только о протоколировании и фактически не изменяя поток данных, вы можете вместо этого использовать doOnNext, doOnError и doOnCompleted.

Если ваш вариант использования немного сложнее, чем отражен в вашем вопросе, и вам действительно нужно подождать на двух «параллельных» асинхронных процессах, вам может потребоваться присоединиться и подождать, используя CountDownLatch (или аналогичный). Имейте в виду, что если вы хотите параллелизма, RxJava не является агностиком в этом отношении, и вам нужно управлять им, используя Schedulers, subscribeOn/observeOn.

+0

Выполнял только регистрацию данных. Зная, помогают ли операторы. –

2

Это не может быть все, что отношение, так как вы уже решили эту проблему в вашем случае, но создание наблюдаемых с использованием сырого OnSubscribe ошибки, в общем. Лучшим подходом является использование одного из существующих вспомогательных классов: SyncOnSubscribe/AsyncOnSubscribe (с RxJava 1.0.15) или AbstractOnSubscribe (до RxJava v1.1.0). Они дают вам готовые рамки, которые управляют состоянием наблюдаемого и противодавлением, оставляя вас иметь дело (в основном) с вашей собственной логикой.

Смежные вопросы