2016-11-07 3 views
1

У меня есть следующий класс, который я держу в одноточечном:RxJava Тема переизлучит некорректный планировщик

public class SessionStore { 
    Subject<Session, Session> subject; 

    public SessionStore() { 
     subject = new SerializedSubject<>(BehaviorSubject.create(new Session()); 
    } 

    public void set(Session session) { 
     subject.onNext(session); 
    } 

    public Observable<UserSession> observe() { 
     return subject.distinctUntilChanged(); 
    } 
} 

В деятельности наблюдает сессию и выполнять работу сети при каждом изменении:

private Subscription init() { 
    return sessionStore 
      .observe() 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .flatMap(new Func1<Session, Observable<Object>>() { 
       @Override 
       public Observable<Object> call(Session session) { 
        return retrofitService.getAThing(); 
       } 
      }) 
      .subscribe(...); 
} 

Когда Я подписываюсь на хранилище сеансов, субъект испускает на io() сразу же, как это BehaviourSubject, а абонент выполняет по mainThread().

Проблема возникает, когда я звоню sessionStore.set(new AnotherSession()), но уже подписался на нее. IMO, это должно выполнить поток, определенный в init() в планировщике io(). Однако вместо этого происходит то, что поток выполняется в том же потоке, на который был вызван subject.onNext(). Результатом является NetworkOnMainThreadException, поскольку я выполняю сетевую операцию в flatMap().

Насколько я понял предметы? Я неправильно их использую? Какое правильное решение, пожалуйста?

Я также попытался заменить весь предметный подход на Observable.fromEmitter() в методе observe(), но удивительно, что выход был тем же самым.

ответ

4

заявление se, посмотрите на следующую часть из книги 'Reactive Programming with RxJava'

По умолчанию вызов onNext() в теме напрямую передается всем методам обратного вызова onNext() наблюдателя. Неудивительно, что эти методы имеют одно и то же имя. В некотором смысле вызов onNext() на Subject косвенно вызывает onNext() для каждого Подписчика.

Рекоммендовать: Если вы вызываете onNext на тему из Thread-1, он будет вызывать onNext для абонента из Thread-1. onSubscribe будет удалено.

Так первые вещи сначала: На какой нить подписки происходят на:

retrofitService.getAThing() 

Я просто предполагаю, и говорят, что это ссылающееся нить. Каким будет поток, описанный в watchOn, который является Android-UI-Loop.

Каждое значение под наблюдениемOn будет смещено от Thread-a к Thread-b, как указано планировщиком. Наблюдение должно быть на UI-Loop должно происходить прямо перед подпиской. Каждое значение, которое будет получено в подписке, будет на UI-Loop, которое не будет блокировать поток пользовательского интерфейса или конец в исключении.

Пиз взглянуть на пример кода и вывод:

class SessionStore { 
    private Subject<String, String> subject; 

    public SessionStore() { 
     subject = BehaviorSubject.create("wurst").toSerialized(); 
    } 

    public void set(String session) { 
     subject.onNext(session); 
    } 

    public Observable<String> observe() { 
     return subject 
       .asObservable() 
       .doOnNext(s -> System.out.println("Receiving value on Thread:: " + Thread.currentThread())) 
       .distinctUntilChanged(); 
    } 
} 

@Test 
public void name() throws Exception { 
    // init 
    SessionStore sessionStore = new SessionStore(); 

    TestSubscriber testSubscriber = new TestSubscriber(); 
    Subscription subscribe = sessionStore 
      .observe() 
      .flatMap(s -> { 
       return Observable.fromCallable(() -> { 
        System.out.println("flatMap Thread:: " + Thread.currentThread()); 
        return s; 
       }).subscribeOn(Schedulers.io()); 
      }) 
      .doOnNext(s -> System.out.println("After flatMap Thread:: " + Thread.currentThread())) 
      .observeOn(Schedulers.newThread()) // imagine AndroidScheduler here 
      .subscribe(testSubscriber); // Do UI-Stuff in subscribe 

    new Thread(() -> { 
     System.out.println("set on Thread:: " + Thread.currentThread()); 
     sessionStore.set("123"); 
    }).start(); 

    new Thread(() -> { 
     System.out.println("set on Thread:: " + Thread.currentThread()); 
     sessionStore.set("345"); 
    }).start(); 

    boolean b = testSubscriber.awaitValueCount(3, 3_000, TimeUnit.MILLISECONDS); 

    Assert.assertTrue(b); 
} 

Выход ::

Receiving value on Thread:: Thread[main,5,main] 
flatMap Thread:: Thread[RxIoScheduler-2,5,main] 
After flatMap Thread:: Thread[RxIoScheduler-2,5,main] 
set on Thread:: Thread[Thread-1,5,main] 
set on Thread:: Thread[Thread-0,5,main] 
Receiving value on Thread:: Thread[Thread-1,5,main] 
flatMap Thread:: Thread[RxIoScheduler-2,5,main] 
After flatMap Thread:: Thread[RxIoScheduler-2,5,main] 
Receiving value on Thread:: Thread[Thread-1,5,main] 
flatMap Thread:: Thread[RxIoScheduler-2,5,main] 
After flatMap Thread:: Thread[RxIoScheduler-2,5,main] 
+0

Спасибо, я не думал об определении планировщика для Observable, созданного внутри flatMap. Я думал, что если я определяю планировщик после плоской карты, он будет применяться ко всему потоку. – bakua

+0

Ну, это зависит от того, чего вы пытаетесь достичь. Вы могли бы использовать функцию watchOn перед FlatMap. Каждое значение вниз по цепочке было бы перемещено от точки наблюденияOn от потока X к другому. Но вы, указав subscribeOn на наблюдаемый в flatMap, вы достигли параллелизма. Таким образом, существует разница между onSubscribe/onObserve. OnObserve может размещаться несколько раз в одном наблюдаемом, чтобы переместить выбросы в другой поток. Для subscribeOn должен быть только один для наблюдаемого, потому что будет использоваться subscribeOn, который является самым верхним. –

+0

Принимая это во внимание, я тогда не понимаю, почему поток ведет себя по-разному, когда я удаляю 'subscribeOn()' изнутри 'flatMap()' и помещаю его на один шаг раньше. Подобно 'sessionStore.observe(). SubscribeOn(). FlatMap() ...' – bakua

0

Когда вы вызываете оператора, он влияет на весь нисходящий поток. Если вы звоните:

.observeOn(AndroidSchedulers.mainThread()) 

в неправильном месте, остальная часть потока выполняется по указанной теме.

Я предлагаю вам всегда добавить:

.subscribeOn(Schedulers.io()) 
.observeOn(AndroidSchedulers.mainThread()) 

в самом конце потока:

private Subscription init() { 
    return sessionStore 
      .observe() 
      .flatMap(new Func1<Session, Observable<Object>>() { 
       @Override 
       public Observable<Object> call(Session session) { 
        return retrofitService.getAThing(); 
       } 
      }) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(...); 
} 
+0

Спасибо. Несмотря на то, что я указываю планировщики в конце потока, проблема до него. – bakua

+0

'retrofitService.getAThing()' не указывает каких-либо планировщиков? –

+0

Nope. Также я упростил пример для лучшей читаемости. – bakua

0

Я думаю, что вы забываете, что ваш Subject также наблюдатель так, чтобы получить onNext для запуска на резьбе io try

public class SessionStore { 
    Subject<Session, Session> subject; 

    public UserSessionStore() { 
     subject = new SerializedSubject<>(BehaviorSubject.create(new Session())).observeOn(Schedulers.io()); 
    } 

    public void set(Session session) { 
     subject.onNext(session); 
    } 

    public Observable<UserSession> observe() { 
     return subject.distinctUntilChanged(); 
    } 
} 
+0

Я тоже пробовал это. Но добавив 'io()' after' distinctUntilChanged() '. Также я считаю, что вы имели в виду 'subscribeOn()', а не 'observOn()' и что конструктор 'SerializedSubject' принимает объект, а не наблюдаемый, поэтому вы не можете сделать это так, как вы писали. – bakua

+0

Да, это опечатка с моей стороны. 'ObservOn' должен находиться в' SerializedSubject', но он все равно должен быть 'observOn'. – JohnWowUs

Смежные вопросы