2016-12-09 3 views
0

поэтому сначала я покажу вам, что у меня есть и то, что я думаю, что там происходит:Run doOnNex на Ja BehaviorSubject в определенной теме

У меня есть BehavourSubject<DataObject>:

private BehaviorSubject<DataObject> dataSubject = BehaviorSubject.create(); 

я вернуть его обратно в определенный функция, которая выглядит следующим образом:

public Observable<DataObject> pendingData() { 
    return this.dataSubject.asObservable() 
     .doOnNext(data -> { 
      // do something with this data that has to be thread save. 
     }) 
     .observeOn(AndroidSchedulers.mainThread()); 
} 

что я предполагаю, что случается, что doOnNext часть будет работать в том же потоке, что this.dataSubject.onNext(data); называется. Но поскольку я делаю что-то, что нужно для сохранения потока в этой лямбда, я должен либо поместить его в семафор, либо выполнить все действия doOnNext в определенном потоке.

Моя первая идея - это «нормальный способ обработки потоков в rx», но я не знаю, как это работает. Я думал, чтобы добавить subscribeOn(certainBackgroundScheduler) к наблюдаемым, как это:

public Observable<DataObject> pendingData() { 
    return this.dataSubject.asObservable() 
     .doOnNext(data -> { 
      // do something with this data that has to be thread save. 
     }) 
     .subscribeOn(certainBackgroundScheduler) 
     .observeOn(AndroidSchedulers.mainThread()); 
} 

Но когда я создаю наблюдаемом с подписки блока, то этот блок работает в этом backgroundScheduler. Когда я вызываю onNext на подписчика, я вызываю его в том потоке, который является логическим, но является ли он таким же в BehaviorSubject?

Действительно ли это так просто? Если нет, как заставить объект запускать блок doOnNext в моем определенномThread?

+0

И когда вы пытаетесь, не работает ли это? – weston

ответ

1

Действительно ли это так просто? Если нет, как я могу заставить объект запускать блок doOnNext в моем определенномThread?

Да, это так. Прочтите docs о subscribeOn и observeOn для уточнения.

+0

Итак, блок doOnNext выполняется в потоке, который передается методом 'observOn', а не методом' subscribeOn'? –

+0

Я был немного не прав. Поэтому в вашем последнем фрагменте кода '.doOnNext' выполняется на' definBackgroundScheduler'. Вы можете легко проверить это с помощью метода Thread.currentThread(). Итак, как говорит Док, по умолчанию ваша работа наблюдается на планировщике, который вы подписали. –

+0

Вы можете отредактировать свой ответ так, чтобы он был правильным? –

2

В вашей цепочке разрешено иметь несколько observeOn, которые позволяют вам маршрутизировать значения между различными «местоположениями» выполнения.

dataSubject 
.observeOn(backgroundScheduler) 
.doOnNext(v -> /* this will run on another scheduler. */) 
.observeOn(AndroidSchedulers.mainThread()) 
.doOnNext(v -> /* this will run on main after the previous */) 
Смежные вопросы