У меня есть система, состоящая из нескольких взаимосвязанных компонентов. Все работает хорошо, но через некоторое время несколько наблюдателей перестают получать предметы, отправленные наблюдаемым onNext().RxJava - Наблюдатели не получают onNext() элементы через некоторое время
упрощенный сценарий, как это: у меня есть Component1.start()
-> создает ConnectableObservable с Observable.create(...).subscribeOn().observeOn().publish()
и выписывает COMPONENT2. После этого соедините() s. Этот наблюдаемый испускает некоторые элементы в цикле, а затем вызывает s.onComplete()
, когда он заканчивается.
Компонент2 реализует наблюдателя. Кроме того, он имеет ConnectableObservable, который запускает цикл while (true). Когда он получает значение в своем onNext(), вызываемом Component1, он уведомляет Component0, используя собственный ConnectableObservable. (ПРИМЕЧАНИЕ. Я также реализовал их с помощью PublishSubject, и это происходит).
Component1.start() //Creates Component1's ConnectableObservable, subscribes Component2 and starts running with connect();
Component1.connectableObservable -> onNext() ---> Component2
Component2.connectableObservable -> onNext() ---> Component0
Когда Component0.onNext() получает конкретный элемент (после 100 итераций), он останавливает Component1.observable, что делает его выход из его петли и вызвать OnComplete().
Через некоторое время Component0 вызывает Component1.start(), и все начинается снова.
То, что я видел в том, что, когда все нормальны Component1.observable.onNext() вызывает rx.internal.operators.OperatorSubscribeOn.......subscriber.onNext()
rx.internal.operators.OperatorSubscribeOn
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
subscriber.onNext() является внутренним классом private static final class ObserveOnSubscriber<T>
и здесь заканчивается вызывающее расписание():
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
график() является
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
счетчик равен 0, поэтому вызывается recursiveScheduler.schedule(this);
, а Component2 получает предмет.
Теперь, , когда он перестает работать, что происходит, это то, что счетчик больше не 0, фактически каждый звонок увеличивает его. Таким образом, recursiveScheduler.schedule(this);
никогда не вызывается, а Component2 ничего не получает.
Что может быть причиной этого? Почему счетчик 0 и в какой-то момент начинает увеличиваться?
UPDATE: Копание в исходном коде я видел следующее: после того, как график() вызывается, есть запланированная задача, которая вызывает код, приведенный ниже, уменьшение счетчика, когда элементы не были пропущены:
private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
// only execute this from schedule()
@Override
public void call() {
...
emitted = currentEmission;
missed = counter.addAndGet(-missed);
if (missed == 0L) {
break;
}
В соответствии с этим, поскольку элементы пропущены, счетчик увеличивается, а затем последующие элементы также пропущены.
В чем причина того, почему предметы пропущены?
Я заметил что-то странное. Если я удалю какие-либо из других (не упомянутых) наблюдаемых из программы, никакие предметы не будут пропущены. Они имеют Component0 в качестве наблюдателя и производят свои объекты в своем потоке subscribeOn(), поэтому я не вижу, как они влияют на этот сценарий.
ОБНОВЛЕНИЕ 2: Я продолжал пытаться выяснить, что происходит. Когда я Component1.connectableObservable.connect()
, он заканчивает тем, что назвал private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 -> init()
Здесь графика() называется:
void init() {
// don't want this code in the constructor because `this` can escape through the
// setProducer call
Subscriber<? super T> localChild = child;
localChild.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
schedule();
Правильные оставляет OperatorObserveOn.counter = 0 после того, как график(). Когда он больше не работает, scheduler() увеличивает +1 значение OperatorObserveOn.counter.
Привет Пол, как я могу создать отписки слушателя для класса, реализующего Obsever? – codependent
Посмотрите на последний пример теста здесь https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/creating/ObservableSubscription.java – paul