2016-01-19 3 views
6

Иногда, когда я отлаживаю свое приложение, я сталкиваюсь с InterruptedException в RxCachedThreadScheduler-1. Вот след: InterruptedException в потоке кеша RxJava при отладке на Android

Fatal Exception: java.lang.InterruptedException 
     at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1991) 
     at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2025) 
     at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1048) 
     at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:776) 
     at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1035) 

У меня есть настраиваемое представление, в котором я подписаться на мой наблюдаемый, как это:

@Override 
protected void onAttachedToWindow() { 
    super.onAttachedToWindow(); 

    sub = FollowHandler.getInstance().getObservable() 
      .filter(new Func1<FollowEvent, Boolean>() { 
       @Override 
       public Boolean call(FollowEvent followEvent) { 
        if(followEvent == null || followEvent.user == null 
          || user == null) 
         return false; 

        return followEvent.user.id == user.id; 
       } 
      }) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(new Observer<FollowEvent>() { 
       @Override 
       public void onCompleted() {} 

       @Override 
       public void onError(Throwable e) {} 

       @Override 
       public void onNext(FollowEvent followEvent) { 
        reactToThisNiceEvent(followEvent); 
       } 
      }); 
} 

@Override 
protected void onDetachedFromWindow() { 
    super.onDetachedFromWindow(); 

    if(sub != null) 
     sub.unsubscribe(); 
} 

Вот наблюдаемый:

eventSubject.asObservable() 
     .observeOn(Schedulers.io()) 
     .doOnNext(new Action1<FollowEvent>() { 
      @Override 
      public void call(FollowEvent followEvent) { 
       if(followEvent != null) 
        doSomethingNice(followEvent); 
      } 
     }) 
     .share(); 

, в котором eventSubject является простой PublishSubject. Я использую RxAndroid 1.1.0 наряду с RxJava 1.1.0.

Кто-нибудь знает, почему это происходит?

ответ

0

Я не уверен, почему это происходит, но попытаться сделать это:

sub = FollowHandler.getInstance().getObservable() 
      .filter(...) 
      .subscribeOn(Schedulers.io()) // <<<<<<<<<< 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(...); 

Кроме того, я думаю, вам не нужно share():

eventSubject.asObservable() 
     .doOnNext(...) 
     .subscribeOn(Schedulers.io()) // <<<<< subscribeOn instead of observeOn, but actually, you don't need it here... 
     .share();  // <<<<< remove it 

Я использую Subject в eventbus очень часто, как я описал выше. И у меня никогда не было таких проблем.

P.S. В onDetachedFromWindow() было бы лучше, если вы проверите подписку о подписке или нет. Я знаю, что этот метод вызывается в основном потоке, и одновременный доступ к этому Subscription невозможно, но я думаю, что это хороший стиль:

if(sub != null && !sub.isUnsubscribed()) 
     sub.unsubscribe(); 
+0

Спасибо за ответ. Я попытаюсь выполнить подписку и наблюдать, как вы предложили. Однако для меня это может занять некоторое время. О доле, мне это нужно. Поскольку задачи перед общим доступом должны быть разделены между всеми подписчиками (и результат одинаковый для всех абонентов). Если я удалю share, doOnNext будет вызван для всех подписчиков (что бессмысленно). И спасибо за исправление для проверки isUnsubscribed(). Я полностью пропустил это. – Avsector

Смежные вопросы