2015-07-03 8 views
3

Так что у меня этот код:RxJava Цепной Наблюдаемые и NetworkMainThreadException

public Observable<AbstractXMPPConnection> connect(final AbstractXMPPConnection connection) { 
    return Observable.<AbstractXMPPConnection>create(subscriber -> { 
     try { 
      AbstractXMPPConnection connection2 = connection.connect(); 
      if (connection2.isConnected()) { 
       subscriber.onNext(connection2); 
       subscriber.onCompleted(); 
      } 
     } catch (SmackException | IOException | XMPPException e) { 
      e.printStackTrace(); 
      subscriber.onError(e); 
     } 
    }) 
    .doOnError(throwable -> LOGI("111", "Connection OnError called")); 
} 


public Observable<AbstractXMPPConnection> connectWithRetry(final AbstractXMPPConnection connection) { 
     return connect(connection) 
       .retryWhen(attempts -> attempts.zipWith(Observable.range(1, MAX_CONNECTION_TRIES), (throwable, integer) -> new Pair<>(throwable, integer)) 
         .flatMap(pair -> { 
          if (pair.second == MAX_LOGIN_TRIES) 
           return Observable.error(pair.first); 
          return Observable.timer(pair.second, TimeUnit.SECONDS); 
         })); 
    } 


public void connect() { 
     assertTrue("To start a connection to the server, you must first call init() method!", 
       this.connectionConfig != null); 

     connectionHelper.connectWithRetry(connection) 
       .observeOn(Schedulers.newThread()) 
       .subscribeOn(AndroidSchedulers.mainThread()) 
       .subscribe(new Subscriber<AbstractXMPPConnection>() { 
        @Override 
        public void onCompleted() { 
        } 

        @Override 
        public void onError(Throwable e) { 
         LOGI(TAG, "ConnectionHelper Connection onError\n"); 

         /**{@link LoginActivity#onConnectionFailure(OnConnectionFailureEvent)} */ 
         MainApplication.getInstance().getBusInstance().post(new OnConnectionFailureEvent()); 
        } 

        @Override 
        public void onNext(AbstractXMPPConnection connection) { 
         LOGI(TAG, "ConnectionHelper Connection onNext"); 
//      onConnected(); 
        } 
       }); 
    } 

У меня есть несколько вопросов по поводу цепочки наблюдаемыми. Представьте себе этот сценарий, в котором у меня есть соединение Observable, которое иногда я использую, но я использую в основном connectWithRetry() Observable.

Мой вопрос, что произойдет, если добавили:

.observeOn(Schedulers.newThread()) 
.subscribeOn(AndroidSchedulers.mainThread()) 

как к connect() и connectWithRetry()? В этом случае, когда я вызываю public void connect и указываю планировщик, предыдущие игнорируются?

И почему я получаю NetworkOnMainThreadException? Явное сообщение observeOn(Schedulers.newThread()) есть, он не должен давать мне эту ошибку

ответ

1

В первую очередь я обращусь к вашей проблеме NetworkOnMainThread.

observeOn(Schedulers.newThread()) означает выход будет наблюдаться в новом потоке - то есть, код в ваш абонент (onComplete/Error/Next) будет работать на этой теме.

subscribeOn(AndroidSchedulers.mainThread()означает подписка будет происходить на главном потоке - код в созданном вами наблюдаемый (connection.connect() и т.д.) является то, что запускается при подписке происходит.

Так просто поменять планировщики:

.subscribeOn(Schedulers.io()) 
.observeOn(AndroidSchedulers.mainThread()) 

Так обратиться на ваш первый вопрос, они не игнорируются, они просто используются неправильно. Надеюсь, из этого вы можете увидеть, что произойдет, если вы переместите аналогичные вызовы в цепочку внутри ваших методов, которые возвращают наблюдаемые: ничего не отличается от того, что вы уже сделали. Звонки просто были бы в другом месте.

Итак, где разместить выбор планировщика? Это зависит от вас. Вы можете получить повышенную ясность по не имеющим subscribeOn вызова внутри методов для создания вашего наблюдаемые:

connectionHelper.connectWithRetry(connection) 
    .subscribeOn(Schedulers.io()) 
    .observeOn(AndroidSchedulers.mainThread()) 

Однако, если вы чувствуете, как вы вызываете это везде без всякой причины, вы можете вместо этого переместить subscribeOn звонить внутри ваших методов:

return connect(connection) 
      .retryWhen(...) 
      .flatMap(...) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()); 

Обратите внимание, что они не должны быть объединены вместе, как это - вы можете subscribeOn внутри метода, но оставить observeOn до любых вызывающих абонентов, которые хотят, чтобы их результаты на конкретном Thr Свинец.

+0

ОК, так что рассматриваю, что я задаю два планировщика для каждого наблюдаемого, а третий - с цепочкой, это неправильно? Или должен ли я просто указать планировщик только на общедоступном void connect? плохо согласитесь с вашим ответом, спасибо за помощь –

+0

Я обновил ответ несколькими примерами - это действительно зависит от вас, где вы их положили! Хотя я не уверен, что вы имеете в виду о третьем? –

+0

ok Спасибо за дополнительную информацию, я ценю это: P im мое мнение, имена плохо выбраны. Я имею в виду наблюдателя, наблюдателя, подписчика, осуществляющего Observer. Его вроде, subscribeOn - главная проблема, потому что, рассматривая, когда вы подписываетесь на наблюдаемый, вы указываете, что хотите делать с результатами, subscribeOn вводит нас в заблуждение, чтобы «в каком потоке вы хотите делать что-то с результатами». Imho –

0

Пожалуйста, попробуйте Schedulers.io() может возникнуть проблема.

Смежные вопросы