2015-08-20 5 views
0

У меня есть наблюдаемый, который я создаю со следующим кодом.Пользовательский повтор Rxjava, наблюдаемый с оператором .cache?

Observable.create(new Observable.OnSubscribe<ReturnType>() { 

     @Override 
     public void call(Subscriber<? super ReturnType> subscriber) { 
     try { 
      if (!subscriber.isUnsubscribed()) { 
      subscriber.onNext(performRequest()); 
      } 
      subscriber.onCompleted(); 
     } catch (Exception e) { 
      subscriber.onError(e); 
     } 
     } 
    }); 

performRequest() будет выполнять длительные задачи, как можно было бы ожидать.

Теперь, так как я мог бы запускать же Observable дважды или более в очень короткий промежуток времени, я решил написать такой трансформатор:

protected Observable.Transformer<ReturnType, ReturnType> attachToRunningTaskIfAvailable() { 
    return origObservable -> { 
     synchronized (mapOfRunningTasks) { 
     // If not in maps 
     if (! mapOfRunningTasks.containsKey(getCacheKey())) { 
      Timber.d("Cache miss for %s", getCacheKey()); 
      mapOfRunningTasks.put(
       getCacheKey(), 
       origObservable 
        .doOnTerminate(() -> { 
        Timber.d("Removed from tasks %s", getCacheKey()); 
        synchronized (mapOfRunningTasks) { 
         mapOfRunningTasks.remove(getCacheKey()); 
        } 
        }) 
        .cache() 
     ); 
     } else { 
      Timber.d("Cache Hit for %s", getCacheKey()); 
     } 
     return mapOfRunningTasks.get(getCacheKey()); 
     } 
    }; 
    } 

Который в основном ставит оригинальные .cache наблюдаемые в HashMap<String, Observable>.

Это в основном запрещает несколько запросов с одинаковыми getCacheKey() (пример login) для вызова performRequest() в параллель. Вместо этого, если второй запрос login поступает в то время, когда выполняется другой, второй запрос наблюдается «отбрасывается», и вместо этого будет использоваться уже запущенный. => Все вызовы на onNext будут cached и отправлены обоим абонентам, фактически нажимая мой бэкэнд только один раз.

Теперь, suppouse этот код:

// Observable loginTask 
public void doLogin(Observable<UserInfo> loginTask) { 
    loginTask.subscribe(
     (userInfo) -> {}, 
     (throwable) -> { 
     if (userWantsToRetry()) { 
      doLogin(loinTask); 
     } 
     } 
    ); 
} 

Где loginTask была составлена ​​с предыдущим трансформатором. Ну, когда возникает ошибка (возможно, связь) и userWantsToRetry(), тогда я в основном повторно вызову метод с тем же наблюдаемым. К сожалению, это было кешировано, и я получаю ту же ошибку, не ударяя performRequest() снова, так как последовательность будет воспроизведена.

Есть ли способ, которым я мог бы иметь поведение «одинаковых запросов», которое предоставляет мне трансформатор И кнопка повтора?

ответ

0

У вашего вопроса много чего, и его трудно выразить прямо. Однако я могу сделать пару рекомендаций. Во-первых, ваш Observable.create можно упростить, используя Observable.defer(Func0<Observable<T>>). Это будет запускать func каждый раз, когда подписчик подписывается и улавливает и передает какие-либо исключения в onError подписчика.

Observable.defer(() -> { 
    return Observable.just(performRequest()); 
}); 

Далее вы можете использовать observable.repeatWhen(Func1<Observable<Void>, Observable<?>>), чтобы решить, когда вы хотите повторить. Операторы повтора будут повторно подписаны на наблюдаемые после события onComplete. Эта конкретная перегрузка отправит событие субъекту при получении события onComplete. Эта функция получит эту функцию. Ваша функция должна вызывать что-то вроде takeWhile(predicate) и onComplete, когда вы не хотите повторять попытку.

Observable.just(1,2,3).flatMap((Integer num) -> { 
    final AtomicInteger tryCount = new AtomicInteger(0); 
    return Observable.just(num) 
     .repeatWhen((Observable<? extends Void> notifications) -> 
      notifications.takeWhile((x) -> num == 2 && tryCount.incrementAndGet() != 3)); 
}) 
.subscribe(System.out::println); 

Выход:

1 
2 
2 
2 
3 

Приведенный выше пример показывает, что повторные попытки являются вслух, когда событие не 2 и до макс 22 повторов. Если вы перейдете на repeatWhen, то flatMap будет содержать ваше решение использовать наблюдаемый в кэше наблюдаемый или realWork. Надеюсь это поможет!

Смежные вопросы