2016-03-11 2 views
0

Привет, я использую RxJava для работы с дисковым хранилищем и выполняю операции. В принципе у меня есть метод, как это:Подождите, пока предыдущий Rx не будет завершен

public Observable<String> getStorageItem(String id, String type) { 
    return Observable.defer(new Func0<Observable<String>>() { 
     // Run db operations to get storage item. 
    } 
} 

Проблема заключается в том, что это возможно, этот метод getStorageItem(...) получает подписку на несколько раз подряд. А операции DB в пределах наблюдаемого не могут выполняться одновременно. Какой у меня лучший вариант? Должен ли я вручную создать некоторую сортировку очереди? Или у RxJava есть какой-то инструмент, который позволяет мне блокировать операцию до тех пор, пока не будет завершена предыдущая?

+0

На самом деле, я думаю, я могу просто решить это с помощью синхронизированной блокировки во всех моих операциях Db. Дурак я. Но хотелось бы услышать, есть ли у кого-либо другие мнения по этому поводу. – clu

+0

Вы можете посмотреть и посмотреть, соответствует ли 'flatMap (Func1, int)' вашим потребностям (http://reactivex.io/RxJava/javadoc/rx/Observable.html#flatMap(rx.functions.Func1,%20int)) – mewa

ответ

2

Вы можете использовать subscribeOn с однопоточными планировщиками, созданных из ExecutorService, чтобы убедиться, что есть только одна операция БД в процессе выполнения:

ExecutorService exec = Schedulers.newSingleThreadExecutor(); 
Scheduler s = Schedulers.from(exec); 

public Observable<String> getStorageItem(String id, String type) { 
    return Observable.fromCallable(() -> { 
     // Do DB operations 
    }); 
} 

getStorageItem("1", "2").subscribeOn(s).subscribe(...); 
getStorageItem("2", "4").subscribeOn(s).subscribe(...); 
getStorageItem("3", "6").subscribeOn(s).subscribe(...); 

Но обратите внимание, что при перемещении вычисления от нити вызывающего абонента, его может выполняться в любое время. Если вам нужно ждать его индивидуально (потому что getStorageItem уже вызван в какой-то поток), вы можете применить toBlocking() после subscribeOn.

Смежные вопросы