0

Мое приложение должно сделать две вещи в целом:Предельные действия с оператором rxJava и retryWhen

  • Принимать только один запрос к сети, в то же время
  • Retry, если запрос не удалось

Вот как я его осуществить:

public class RequestsLocker { 

    private volatile boolean isLocked; 

    public <T> Observable.Transformer<T, T> applyLocker() { 
     if(!isLocked()) { 
      return observable -> observable 
        .doOnSubscribe(() -> { 
         lockChannel(); 
        }) 
        .doOnUnsubscribe(() -> { 
         freeChannel(); 
        }); 
     } else { 
      return observable -> Observable.error(new ChannelBusyException("Channel is busy now.")); 
     } 
    } 

    private void lockChannel() { 
     isLocked = true; 
    } 

    private void freeChannel() { 
     isLocked = false; 
    } 

    public boolean isLocked() { 
     return isLocked; 
    } 

} 

Выглядит хорошо.

Теперь моя retryWhen реализация:

public static Observable<?> retryWhenAnyIoExceptionWithDelay(Observable<? extends Throwable> observable) { 
    return observable.flatMap(error -> { 
     // For IOExceptions, we retry 
     if (error instanceof IOException) { 
      return Observable.timer(2, TimeUnit.SECONDS); 
     } 

     // For anything else, don't retry 
     return Observable.error(error); 
    }); 
} 

Существует, как я использую его:

public Observable<List<QueueCarItem>> finishService(int id, PaymentType paymentType, String notes) { 
    return carsQueueApi.finishService(id, new FinishCarServiceRequest(paymentType.getName(), notes)) 
      .compose(requestsLocker.applyLocker(RequestsLocker.RequestChannel.CHANGE)); 
} 

...

public void finishCarService(QueueCarItem carItem, PaymentType paymentType, 
          String notes, Subscriber<List<QueueCarItem>> subscriber) { 
    queueApiMediator.finishService(carItem.getId(), paymentType, notes) 
      .subscribeOn(ioScheduler) 
      .observeOn(uiScheduler) 
      .doOnError(this::handleError) 
      .retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay) 
      .subscribe(subscriber); 
} 

Основная проблема, которая doOnUnsubscribe() называется на любой ошибки и то шкафчик открыт для любого нового запроса до истечения таймера и повторной подписки снова. Это проблема. Пока таймер тикает, пользователь может сделать другой запрос.

Как я могу это исправить?

+1

Не могли бы вы разместить код, показывающий, как вы на самом деле используете трансформатор applyLocker и 'retryWhenAnyIoExceptionWithDelay'? – JohnWowUs

+0

@JohnWowUs готово. – Alexandr

ответ

1

Проблема заключается в том, что вы подаете свой трансформатор к источнику наблюдаемого т.е. до вашего retrywhen. При возникновении ошибки вы всегда отказываетесь от подписки, а затем повторно подписываетесь на источник, наблюдаемый , ведущий к вызываемому вами doOnUnsubscribe.

Я предлагаю вам попробовать

public Observable<List<QueueCarItem>> finishService(int id, PaymentType paymentType, String notes) { 
    return carsQueueApi.finishService(id, new FinishCarServiceRequest(paymentType.getName(), notes));    
} 


public void finishCarService(QueueCarItem carItem, PaymentType paymentType, 
          String notes, Subscriber<List<QueueCarItem>> subscriber) { 
    queueApiMediator.finishService(carItem.getId(), paymentType, notes) 
      .subscribeOn(ioScheduler) 
      .observeOn(uiScheduler) 
      .doOnError(this::handleError) 
      .retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay) 
      .compose(requestsLocker.applyLocker(RequestsLocker.RequestChannel.CHANGE)); 
      .subscribe(subscriber); 
} 

PS: применяется шкафчик трансформатор выглядит немного по-другому, т.е. он не принимает аргумент в коде вы связаны между собой.

+0

Да, я понимаю.В этом проблема: я решил попробовать CleanAcritecture от android10. Итак, 'RequetsLocker' и' finishService' метод находятся в 'слое данных'. Метод 'finishCarService' в домене. Теперь я думаю о перемещении retryWhen оператор в «слой данных». Но это нарушает философию CA, поскольку «слой данных» должен решить «ЧТО делать», а не «КАК». Другой способ - позволить 'RequestLocker'' на уровне домена. В этом случае я должен уведомить «доменный слой» об источнике данных, поскольку «RequestLocker» должен применяться только к сетевому запросу, а не к 'getFromCache'. Это тоже плохая практика. Мех .. – Alexandr

1

Использование retryWhen, чтобы избежать отказаться от подписки OnError вы должны использовать onErrorResumeNext который обыкновение отменить подписку.

Посмотрите этот пример

/** 
* Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated 
*/ 
@Test 
public void observableOnErrorResumeNext() { 
    Subscription subscription = Observable.just(null) 
              .map(Object::toString) 
              .doOnError(failure -> System.out.println("Error:" + failure.getCause())) 
              .retryWhen(errors -> errors.doOnNext(o -> count++) 
                    .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null)), 
                Schedulers.newThread()) 
              .onErrorResumeNext(t -> { 
               System.out.println("Error after all retries:" + t.getCause()); 
               return Observable.just("I save the world for extinction!"); 
              }) 
              .subscribe(s -> System.out.println(s)); 
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS); 
} 

Также о параллельности, если вы делаете операцию в оператора flatMap, вы можете указать Максимальное количество одновременных.

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent) { 
    if (getClass() == ScalarSynchronousObservable.class) { 
     return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func); 
    } 
    return merge(map(func), maxConcurrent); 
} 

Вы можете увидеть больше примеров здесь https://github.com/politrons/reactive

+0

спасибо. Я не могу наконец понять ваш код только сейчас, но я проведу его через несколько дней. – Alexandr

0

Мое текущее решение не разблокировать RequestLocker на IOException как в этом случае запроса будет повторяться после задержки.

public <T> Observable.Transformer<T, T> applyLocker() { 
    if(!isLocked()) { 
     return observable -> observable.doOnSubscribe(() -> { 
      lockChannel(); 
     }).doOnNext(obj -> { 
      freeChannel(); 
     }).doOnError(throwable -> { 
      if(throwable instanceof IOException) { 
       return; // as any request will be repeated in case of IOException 
      } 
      freeChannel(channel); 
     }); 
    } else { 
     return observable -> Observable.error(new ChannelBusyException("Channel is busy now")); 
    } 
}