У меня есть наблюдаемый, который я создаю со следующим кодом.Пользовательский повтор Rxjava, наблюдаемый с оператором .cache?
Observable.create(new Observable.OnSubscribe<ReturnType>() {
@Override
public void call(Subscriber<? super ReturnType> subscriber) {
try {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(performRequest());
}
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
});
performRequest()
будет выполнять длительные задачи, как можно было бы ожидать.
Теперь, так как я мог бы запускать же Observable дважды или более в очень короткий промежуток времени, я решил написать такой трансформатор:
protected Observable.Transformer<ReturnType, ReturnType> attachToRunningTaskIfAvailable() {
return origObservable -> {
synchronized (mapOfRunningTasks) {
// If not in maps
if (! mapOfRunningTasks.containsKey(getCacheKey())) {
Timber.d("Cache miss for %s", getCacheKey());
mapOfRunningTasks.put(
getCacheKey(),
origObservable
.doOnTerminate(() -> {
Timber.d("Removed from tasks %s", getCacheKey());
synchronized (mapOfRunningTasks) {
mapOfRunningTasks.remove(getCacheKey());
}
})
.cache()
);
} else {
Timber.d("Cache Hit for %s", getCacheKey());
}
return mapOfRunningTasks.get(getCacheKey());
}
};
}
Который в основном ставит оригинальные .cache
наблюдаемые в HashMap<String, Observable>
.
Это в основном запрещает несколько запросов с одинаковыми getCacheKey()
(пример login
) для вызова performRequest()
в параллель. Вместо этого, если второй запрос login
поступает в то время, когда выполняется другой, второй запрос наблюдается «отбрасывается», и вместо этого будет использоваться уже запущенный. => Все вызовы на onNext
будут cached
и отправлены обоим абонентам, фактически нажимая мой бэкэнд только один раз.
Теперь, suppouse этот код:
// Observable loginTask
public void doLogin(Observable<UserInfo> loginTask) {
loginTask.subscribe(
(userInfo) -> {},
(throwable) -> {
if (userWantsToRetry()) {
doLogin(loinTask);
}
}
);
}
Где loginTask была составлена с предыдущим трансформатором. Ну, когда возникает ошибка (возможно, связь) и userWantsToRetry()
, тогда я в основном повторно вызову метод с тем же наблюдаемым. К сожалению, это было кешировано, и я получаю ту же ошибку, не ударяя performRequest()
снова, так как последовательность будет воспроизведена.
Есть ли способ, которым я мог бы иметь поведение «одинаковых запросов», которое предоставляет мне трансформатор И кнопка повтора?