2016-06-08 3 views
0

У меня есть система, состоящая из нескольких взаимосвязанных компонентов. Все работает хорошо, но через некоторое время несколько наблюдателей перестают получать предметы, отправленные наблюдаемым onNext().RxJava - Наблюдатели не получают onNext() элементы через некоторое время

упрощенный сценарий, как это: у меня есть Component1.start() -> создает ConnectableObservable с Observable.create(...).subscribeOn().observeOn().publish() и выписывает COMPONENT2. После этого соедините() s. Этот наблюдаемый испускает некоторые элементы в цикле, а затем вызывает s.onComplete(), когда он заканчивается.

Компонент2 реализует наблюдателя. Кроме того, он имеет ConnectableObservable, который запускает цикл while (true). Когда он получает значение в своем onNext(), вызываемом Component1, он уведомляет Component0, используя собственный ConnectableObservable. (ПРИМЕЧАНИЕ. Я также реализовал их с помощью PublishSubject, и это происходит).

Component1.start() //Creates Component1's ConnectableObservable, subscribes Component2 and starts running with connect(); 

Component1.connectableObservable -> onNext() ---> Component2 

Component2.connectableObservable -> onNext() ---> Component0 

Когда Component0.onNext() получает конкретный элемент (после 100 итераций), он останавливает Component1.observable, что делает его выход из его петли и вызвать OnComplete().

Через некоторое время Component0 вызывает Component1.start(), и все начинается снова.

То, что я видел в том, что, когда все нормальны Component1.observable.onNext() вызывает rx.internal.operators.OperatorSubscribeOn.......subscriber.onNext()

rx.internal.operators.OperatorSubscribeOn 

@Override 
public void call(final Subscriber<? super T> subscriber) { 
    final Worker inner = scheduler.createWorker(); 
    subscriber.add(inner); 

    inner.schedule(new Action0() { 
     @Override 
     public void call() { 
      final Thread t = Thread.currentThread(); 

      Subscriber<T> s = new Subscriber<T>(subscriber) { 
       @Override 
       public void onNext(T t) { 
        subscriber.onNext(t); 

subscriber.onNext() является внутренним классом private static final class ObserveOnSubscriber<T> и здесь заканчивается вызывающее расписание():

@Override 
public void onNext(final T t) { 
    if (isUnsubscribed() || finished) { 
     return; 
    } 
    if (!queue.offer(on.next(t))) { 
     onError(new MissingBackpressureException()); 
     return; 
    } 
    schedule(); 
} 

график() является

protected void schedule() { 
    if (counter.getAndIncrement() == 0) { 
     recursiveScheduler.schedule(this); 
    } 
} 

счетчик равен 0, поэтому вызывается recursiveScheduler.schedule(this);, а Component2 получает предмет.

Теперь, , когда он перестает работать, что происходит, это то, что счетчик больше не 0, фактически каждый звонок увеличивает его. Таким образом, recursiveScheduler.schedule(this); никогда не вызывается, а Component2 ничего не получает.

Что может быть причиной этого? Почему счетчик 0 и в какой-то момент начинает увеличиваться?

UPDATE: Копание в исходном коде я видел следующее: после того, как график() вызывается, есть запланированная задача, которая вызывает код, приведенный ниже, уменьшение счетчика, когда элементы не были пропущены:

private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 { 

    // only execute this from schedule() 
    @Override 
    public void call() { 
     ... 
      emitted = currentEmission; 
      missed = counter.addAndGet(-missed); 
      if (missed == 0L) { 
       break; 
      } 

В соответствии с этим, поскольку элементы пропущены, счетчик увеличивается, а затем последующие элементы также пропущены.

В чем причина того, почему предметы пропущены?

Я заметил что-то странное. Если я удалю какие-либо из других (не упомянутых) наблюдаемых из программы, никакие предметы не будут пропущены. Они имеют Component0 в качестве наблюдателя и производят свои объекты в своем потоке subscribeOn(), поэтому я не вижу, как они влияют на этот сценарий.

ОБНОВЛЕНИЕ 2: Я продолжал пытаться выяснить, что происходит. Когда я Component1.connectableObservable.connect(), он заканчивает тем, что назвал private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 -> init()

Здесь графика() называется:

void init() { 
      // don't want this code in the constructor because `this` can escape through the 
      // setProducer call 
      Subscriber<? super T> localChild = child; 

      localChild.setProducer(new Producer() { 

       @Override 
       public void request(long n) { 
        if (n > 0L) { 
         BackpressureUtils.getAndAddRequest(requested, n); 
         schedule(); 

Правильные оставляет OperatorObserveOn.counter = 0 после того, как график(). Когда он больше не работает, scheduler() увеличивает +1 значение OperatorObserveOn.counter.

ответ

0

Получите наблюдателя за каждую подписку и создайте слушателя, чтобы рассказать вам, когда один наблюдатель отписывается от наблюдаемого, тогда вы можете понять, почему это происходит.

В любом случае я бы посмотрел на Реле, так как вам не нужно отписывать свои наблюдаемые, это более безопасно, и вы можете быть уверены, что он никогда не прекратит излучать события.

Взгляните на этот пример.

https://github.com/politrons/reactive/blob/master/src/test/java/rx/relay/Relay.java

+0

Привет Пол, как я могу создать отписки слушателя для класса, реализующего Obsever? – codependent

+0

Посмотрите на последний пример теста здесь https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/creating/ObservableSubscription.java – paul

Смежные вопросы