Фон: Я ищу для вызова веб-службы внутри метода вызова OnSubscribe. Существует пользовательский класс Subscriber, который также является подписчиком этого Observable. (Следующий код является приближение же)Метод OnSubscribe.call вызывается дважды
Выпуск: Видя, что OnSubscribe.call метод вызывается дважды.
Соотношение между Observable.create (...) ', subscribe (...) и, наконец, преобразование наблюдаемого toBlocking мне не ясно. Кажется, что строка ниже добавляет поведение и как-то снова вызывает OnSubscribe.call. Вызов toBlocking, который, как я предполагаю, в конечном счете должен быть вызван - непосредственно перед тем, как результат должен быть возвращен веб-сервером (сервлет/контроллер, которые не являются Rx в природе).
observable.toBlocking().forEach(i-> System.out.println(i));
Полный код Ниже
public static void main(String args[]){
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
System.out.println("In OnSubscribe Create: "+observer.isUnsubscribed());
try {
if (!observer.isUnsubscribed()) {
for (int i = 1; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
});
observable.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
observable.toBlocking().forEach(i-> System.out.println(i));
}
Выход
In OnSubscribe Create: false
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
In OnSubscribe Create: false
For Each Printing 1
For Each Printing 2
For Each Printing 3
For Each Printing 4
Отвечая на собственный вопрос: Так я обрабатывал результаты через Foreach ошибалась. Это само по себе является подписчиком и, следовательно, снова вызывает метод вызова. Правильный способ делать то, что я делал, было через CountDownLatch.
Это также разъясняет мое другое сомнение - в контексте наличия точки, в которой мы, наконец, дождаемся завершения работы непосредственно перед возвратом ответчика в контроллере/сервлете, означало бы использовать защелку с таймаутом.
Код ниже.
public static void main(String args[]) throws Exception {
CountDownLatch latch = new CountDownLatch(4);
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
System.out.println("In OnSubscribe Create: "+observer.isUnsubscribed());
try {
if (!observer.isUnsubscribed()) {
for (int i = 1; i < 5; i++) {
observer.onNext(i);
latch.countDown();
}
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
});
observable.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
latch.await();
System.out.println(" Wait for Latch Over");
}
Возможно, я ошибаюсь, но вы, как правило, побеждаете цель RxJava: если вы собираетесь использовать ее в блокирующем режиме, вы можете уйти с простой петлей и не нанести накладные расходы RxJava , – AndroidEx
@LordRaydenMK - Это единственный способ ответить на ваш собственный вопрос (или через комментарии), если вы новичок в StackOverflow. –
@AndroidEx - Извините, что я новичок в RX и выяснил это. Но предположим, что у нас есть гигантское монолитное приложение, работающее более десятилетия, я полагаю, мы будем медленно менять его для работы с RX. Кроме того, поскольку все еще не является реактивным в мире веб-приложений (приложения и веб-серверы), мы все равно будем иметь родительский поток, ожидающий завершения деятельности, прежде чем отвечать на запросы. –