Так что у меня этот код:RxJava Цепной Наблюдаемые и NetworkMainThreadException
public Observable<AbstractXMPPConnection> connect(final AbstractXMPPConnection connection) {
return Observable.<AbstractXMPPConnection>create(subscriber -> {
try {
AbstractXMPPConnection connection2 = connection.connect();
if (connection2.isConnected()) {
subscriber.onNext(connection2);
subscriber.onCompleted();
}
} catch (SmackException | IOException | XMPPException e) {
e.printStackTrace();
subscriber.onError(e);
}
})
.doOnError(throwable -> LOGI("111", "Connection OnError called"));
}
public Observable<AbstractXMPPConnection> connectWithRetry(final AbstractXMPPConnection connection) {
return connect(connection)
.retryWhen(attempts -> attempts.zipWith(Observable.range(1, MAX_CONNECTION_TRIES), (throwable, integer) -> new Pair<>(throwable, integer))
.flatMap(pair -> {
if (pair.second == MAX_LOGIN_TRIES)
return Observable.error(pair.first);
return Observable.timer(pair.second, TimeUnit.SECONDS);
}));
}
public void connect() {
assertTrue("To start a connection to the server, you must first call init() method!",
this.connectionConfig != null);
connectionHelper.connectWithRetry(connection)
.observeOn(Schedulers.newThread())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<AbstractXMPPConnection>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
LOGI(TAG, "ConnectionHelper Connection onError\n");
/**{@link LoginActivity#onConnectionFailure(OnConnectionFailureEvent)} */
MainApplication.getInstance().getBusInstance().post(new OnConnectionFailureEvent());
}
@Override
public void onNext(AbstractXMPPConnection connection) {
LOGI(TAG, "ConnectionHelper Connection onNext");
// onConnected();
}
});
}
У меня есть несколько вопросов по поводу цепочки наблюдаемыми. Представьте себе этот сценарий, в котором у меня есть соединение Observable, которое иногда я использую, но я использую в основном connectWithRetry()
Observable.
Мой вопрос, что произойдет, если добавили:
.observeOn(Schedulers.newThread())
.subscribeOn(AndroidSchedulers.mainThread())
как к connect()
и connectWithRetry()
? В этом случае, когда я вызываю public void connect и указываю планировщик, предыдущие игнорируются?
И почему я получаю NetworkOnMainThreadException
? Явное сообщение observeOn(Schedulers.newThread())
есть, он не должен давать мне эту ошибку
ОК, так что рассматриваю, что я задаю два планировщика для каждого наблюдаемого, а третий - с цепочкой, это неправильно? Или должен ли я просто указать планировщик только на общедоступном void connect? плохо согласитесь с вашим ответом, спасибо за помощь –
Я обновил ответ несколькими примерами - это действительно зависит от вас, где вы их положили! Хотя я не уверен, что вы имеете в виду о третьем? –
ok Спасибо за дополнительную информацию, я ценю это: P im мое мнение, имена плохо выбраны. Я имею в виду наблюдателя, наблюдателя, подписчика, осуществляющего Observer. Его вроде, subscribeOn - главная проблема, потому что, рассматривая, когда вы подписываетесь на наблюдаемый, вы указываете, что хотите делать с результатами, subscribeOn вводит нас в заблуждение, чтобы «в каком потоке вы хотите делать что-то с результатами». Imho –