2016-06-26 3 views
0

В моем приложении для Android я использую интерфейс репозитория домена, который поддерживается локальным БД, реализованным с использованием SqlBrite и сетевого api с возможностью обновления. Поэтому у меня есть метод getDomains(): Observable<List<Domain>> в репозитории и два соответствующих метода в моей модификации и SqlBrite. Я не хочу объединяться или сливаться, или амбировать эти две наблюдаемые. Я хочу, чтобы мой репозиторий получал данные только из SqlBrite, и поскольку SqlBrite возвращает QueryObservable, который запускает onNext() каждый раз, когда базовые данные меняются, я могу самостоятельно выполнить свой сетевой запрос и сохранить результаты в SqlBrite и обновить Observable с извлечением из сети и сохранить в DB данные. Так что я пытался реализовать getDomains() метод моего репозитория следующим образом:RxJava поток данных с SqlBrite и дооснащением

fun getDomains(): Observable<List<Domain>> { 
    return db.getDomains() 
        .doOnSubscribe { 
         networkClient.getDomains() 
            .doOnNext { db.putDomains(it) } 
            .onErrorReturn{ emptyList() } 
            .subscribe() 
        } 
} 

Но в этом случае каждый раз, когда клиент должен подписаться, каждый раз, когда он будет делать сетевые запросы, которые не так хорошо. Я думал о других операторах do..., чтобы пересылать запросы там, но doOnCompleted() в случае QueryObservable никогда не будет вызван, пока я не позвоню toBlocking() где-то, чего я не буду, doOnEach() тоже не очень хорошо, поскольку он делает запросы каждый раз, когда элемент из db извлечен. Я также пытался использовать оператор replay(), но, несмотря на то, что в этом случае кэшированный Observable, подписка происходит и приводит к сетевым запросам. Итак, как можно объединить эти два Наблюдения в желаемом порядке?

ответ

1

Хорошо, это зависит от конкретного используемого вами случая: т.е. если вы хотите отображать последние данные из своей локальной базы данных и время от времени обновлять базу данных, выполняя сетевой запрос в фоновом режиме.

Может быть, есть лучший способ, но, может быть, вы могли бы сделать что-то вроде этого

fun <T> createDataAwareObservable(databaseQuery: Observable<T>): Observable<T> = 
     stateDeterminer.getState().flatMap { 
     when (it) { 
      State.UP_TO_DATE -> databaseQuery // Nothing to do, data is up to date so observable can be returned directly 

      State.NO_DATA -> 
      networkClient.getDomains() // no data so first do the network call 
       .flatMap { db.save(it) } // save network call result in database 
       .flatMap { databaseQuery } // continue with original observable 

      State.SYNC_IN_BACKGROUND -> { 
      // Execute sync in background 
      networkClient.getDomains() 
       .flatMap { db.save(it) } 
       .observeOn(backgroundSyncScheduler) 
       .subscribeOn(backgroundSyncScheduler) 
       .subscribe({}, { Timber.e(it, "Error when starting background sync") }, {}) 

      // Continue with original observable in parallel, network call will then update database and thanks to sqlbrite databaseQuery will be update automatically 
      databaseQuery 
      } 
     } 
     } 

Так в конце вы создаете SQLBrite Observable (QueryObservable) и передать его в функцию createDataAwareObservable(). Затем он будет обеспечивать, чтобы он загружал данные из сети, если здесь нет данных, иначе он будет проверять, должны ли данные обновляться в фоновом режиме (сохранит их в базе данных, а затем автоматически обновит SQLBrite QueryObservable) или если данные своевременно.

В принципе вы можете использовать его как это:

createDataAwareObservable(db.getAllDomains()).subscribe(...) 

Так что для вас, как пользователя этого createDataAwareObservable() вы всегда получите тот же тип Observable<T> назад, как вы передаете в качестве параметра. По существу, кажется, что вы всегда подписывались на db.getAllDomains() ...

+0

, откуда должно прибыть государство? –

+0

Это деталь реализации в зависимости от вашего варианта использования. 'stateDeterminer.getState()' возвращает 'Observable '. это может быть проверка, если база данных пуста, а затем возвращает 'State.NO_DATA' или элемент времени (т. е. возврат' State.SYNC_IN_BACKGROUND' будет возвращаться каждые 24 часа, чтобы данные обновлялись один раз в день) и так далее , Это действительно зависит от того, что вы хотите достичь, и как часто и когда вы хотите синхронизировать, выполняя сетевой вызов. – sockeqwe

+0

имеет смысл. Я хочу получать свежие данные каждый раз, когда пользователи запускают приложение и «getDomains» вызывается в первый раз, а затем в течение некоторого времени, например. 120 секунд, когда звонки subscribe() я хочу вернуть только из db. Но если пользователи снижают список для обновления, я хочу перезагрузить данные из сети –

0

если ваша проблема в том, что вы должны подписаться ваш наблюдатель каждый раз, когда вы хотите получить данные, которые вы можете использовать реле, которые никогда не отписываться наблюдателей, поскольку не реализует OnComplete

/** 
* Relay is just an observable which subscribe an observer, but it wont unsubscribe once emit the items. So the pipeline keep open 
* It should return 1,2,3,4,5 for first observer and just 3, 4, 5 fot the second observer since default relay emit last emitted item, 
* and all the next items passed to the pipeline. 
*/ 
@Test 
public void testRelay() throws InterruptedException { 
    BehaviorRelay<String> relay = BehaviorRelay.create("default"); 
    relay.subscribe(result -> System.out.println("Observer1:" + result)); 
    relay.call("1"); 
    relay.call("2"); 
    relay.call("3"); 
    relay.subscribe(result -> System.out.println("Observer2:" + result)); 
    relay.call("4"); 
    relay.call("5"); 
} 

Другие примеры здесь https://github.com/politrons/reactive/blob/master/src/test/java/rx/relay/Relay.java

+0

Нет. Это не моя проблема. Это нормально отказаться от подписки и подписки во время жизненного цикла приложения. Моя проблема в том, что я не хочу выполнять сетевой запрос каждый раз, когда я подписываюсь. Если мне нужно будет работать с сетью напрямую, я бы использовал оператор 'replay (120, TimeUnit.Seconds)'. Но я работаю с db. Я хочу подписаться на него, но выполнять сетевой запрос не каждый раз, когда я подписываюсь –

+0

Пробовал ли вы оператор кеша? – paul

+0

Я пытался использовать оператор 'replay()' в своей сети наблюдаемый –

Смежные вопросы