2016-08-21 5 views
0

Существует, как я сохранить данные с RxJava:SQLite сделки и RxJava

override fun put(note: Note): Observable<Note> { 
    validateNote(note) 
    return Observable.just(note) 
      .doOnNext { dbKeeper.startTransaction() } 
      .doOnNext { storeHashtags(note) } 
      .doOnNext { storeImages(note) } 
      .flatMap { notesDataStore.put(notesMapper.transform(note)) } 
      .map { notesMapper.transform(it) } 
      .doOnNext { dbKeeper.setTransactionSuccessful() } 
      .doOnUnsubscribe { dbKeeper.endTransaction() } 
} 

И тогда я использую этот метод, как это:

 notesManager.put(note) 
      .switchMap { notesManager.getHashtags() } 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe {view.setHashtags(it) } 

И doOnUnsubscribe никогда не называется getHashtags() попытке выбрать из db, который заблокирован startTransaction(). Тупик, хе. Хорошо. Давайте заменим doOnUnsubscribe(...) на doOnTerminate(...).

override fun put(note: Note): Observable<Note> { 
    validateNote(note) 
    return Observable.just(note) 
      .doOnNext { dbKeeper.startTransaction() } 
      .doOnNext { storeHashtags(note) } 
      .doOnNext { storeImages(note) } 
      .map { notesMapper.transform(note) } 
      .flatMap { notesDataStore.put(it) } 
      .map { notesMapper.transform(it) } 
      .doOnNext { dbKeeper.setTransactionSuccessful() } 
      .doOnTerminate { dbKeeper.endTransaction() } 
} 

Но теперь сделка не закроется, если Observable будет прервана subscriber.unsubscribe().

Что вы можете посоветовать для решения моей проблемы?

Дополнительная информация: Я использую один экземпляр writableDb для записи/чтения данных.

ответ

1

Я бы сказал, что это не подходит для Rx; Моя рекомендация будет делать транзакцию с использованием обычной примерки, наконец (простите за Java):

Observable<Note> put(Note note) { 
    return just(note) 
     .doOnNext(n -> { 
      validateNote(n); 
      dbKeeper.startTransaction(); 
      try { 
       storeHashtags(n); 
       storeImages(n); 
       notesDataStore.put(notesMapper.transform(n)); 
       dbKeeper.setTransactionSuccessful(); 
      } finally { 
       dbKeeper.endTransaction(); 
      } 
     }); 
} 

Edit: Может быть, это более полезно:

fun <T> withTransaction(sourceObservable: Observable<T>): Observable<T> { 
    val latch = AtomicInteger(1) 
    val maybeEndTransaction = Action0 { 
     if (latch.decrementAndGet() == 0) { 
      endTransaction() 
     } 
    } 
    return Observable.empty<T>() 
      .doOnCompleted { startTransaction() } 
      .mergeWith(sourceObservable) 
      .doOnNext { setTransactionSuccessful() } 
      .doOnTerminate(maybeEndTransaction) 
      .doOnUnsubscribe(maybeEndTransaction) 
} 

Используйте это так:

override fun put(note: Note): Observable<Note> { 
    return Observable.just(note) 
     .doOnNext { validateNote(note) } 
     .doOnNext { storeHashtags(note) } 
     .doOnNext { storeImages(note) } 
     .flatMap { notesDataStore.put(notesMapper.transform(note)) } 
     .map { notesMapper.transform(it) } 
     .compose { withTransaction } 
} 

Это гарантирует, что транзакция закончится ровно один раз; просто будьте осторожны, чтобы вы не переключали потоки в своей исходной наблюдаемой цепочке (если только ваша транзакция amanger не может справиться с этим, или вы изменили свои планировщики для связывания новых потоков с существующими транзакциями).

+0

Да, я уже понял, что Rx не подходит для этого случая. Но уже слишком поздно менять архитектуру проекта. Спасибо за ваше решение. Возможно, я буду использовать 'BlockingObservable' для' notesDataStore.put() 'и подобных методов. – Alexandr

+1

Я добавил редактирование. –

+0

Выглядит интересно, я попробую. Благодаря! – Alexandr

Смежные вопросы