2016-07-17 3 views
2

Я играю с оператором RxJava retryWhen. Об этом мало что можно узнать в интернете, единственное, достойное упоминания, - this. Это тоже не позволяет исследовать различные варианты использования, которые я хотел бы понять. Я также бросил асинхронное выполнение и повторил попытку, чтобы сделать его более реалистичным.RxJava retryКогда странное поведение

Моя настройка проста: у меня есть класс ChuckNorrisJokesRepository, который возвращает случайное число шуток Чака Норриса из файла JSON. Мой класс тестируется ChuckNorrisJokesService, который показан ниже. Случаи использования я заинтересован в следующем:

  1. преуспевает на 1-й попытки (без повторных попыток)
  2. терпит неудачу после 1 повторить попытку
  3. Попытки повторами 3 раза, но преуспевает на второй, следовательно, не повторить третий раз
  4. преуспевает на 3 повтора

Примечание: проект доступен на моем GitHub.

ChuckNorrisJokesService.java:

@Slf4j 
@Builder 
public class ChuckNorrisJokesService { 
    @Getter 
    private final AtomicReference<Jokes> jokes = new AtomicReference<>(new Jokes()); 

    private final Scheduler scheduler; 
    private final ChuckNorrisJokesRepository jokesRepository; 
    private final CountDownLatch latch; 
    private final int numRetries; 
    private final Map<String, List<String>> threads; 

    public static class ChuckNorrisJokesServiceBuilder { 
     public ChuckNorrisJokesService build() { 
      if (scheduler == null) { 
       scheduler = Schedulers.io(); 
      } 

      if (jokesRepository == null) { 
       jokesRepository = new ChuckNorrisJokesRepository(); 
      } 

      if (threads == null) { 
       threads = new ConcurrentHashMap<>(); 
      } 

      requireNonNull(latch, "CountDownLatch must not be null."); 

      return new ChuckNorrisJokesService(scheduler, jokesRepository, latch, numRetries, threads); 
     } 
    } 

    public void setRandomJokes(int numJokes) { 
     mergeThreadNames("getRandomJokes"); 

     Observable.fromCallable(() -> { 
      log.debug("fromCallable - before call. Latch: {}.", latch.getCount()); 
      mergeThreadNames("fromCallable"); 
      latch.countDown(); 

      List<Joke> randomJokes = jokesRepository.getRandomJokes(numJokes); 
      log.debug("fromCallable - after call. Latch: {}.", latch.getCount()); 

      return randomJokes; 
     }).retryWhen(errors -> 
       errors.zipWith(Observable.range(1, numRetries), (n, i) -> i).flatMap(retryCount -> { 
        log.debug("retryWhen. retryCount: {}.", retryCount); 
        mergeThreadNames("retryWhen"); 

        return Observable.timer(retryCount, TimeUnit.SECONDS); 
       })) 
       .subscribeOn(scheduler) 
       .subscribe(j -> { 
          log.debug("onNext. Latch: {}.", latch.getCount()); 
          mergeThreadNames("onNext"); 

          jokes.set(new Jokes("success", j)); 
          latch.countDown(); 
         }, 
         ex -> { 
          log.error("onError. Latch: {}.", latch.getCount(), ex); 
          mergeThreadNames("onError"); 
         }, 
         () -> { 
          log.debug("onCompleted. Latch: {}.", latch.getCount()); 
          mergeThreadNames("onCompleted"); 

          latch.countDown(); 
         } 
       ); 
    } 

    private void mergeThreadNames(String methodName) { 
     threads.merge(methodName, 
       new ArrayList<>(Arrays.asList(Thread.currentThread().getName())), 
       (value, newValue) -> { 
        value.addAll(newValue); 

        return value; 
       }); 
    } 
} 

Для краткости я буду показывать только тестовый случай Спок для 1-го варианта использования. См. Мои GitHub для других тестовых случаев.

def "succeeds on 1st attempt"() { 
    setup: 
    CountDownLatch latch = new CountDownLatch(2) 
    Map<String, List<String>> threads = Mock(Map) 
    ChuckNorrisJokesService service = ChuckNorrisJokesService.builder() 
      .latch(latch) 
      .threads(threads) 
      .build() 

    when: 
    service.setRandomJokes(3) 
    latch.await(2, TimeUnit.SECONDS) 

    Jokes jokes = service.jokes.get() 

    then: 
    jokes.status == 'success' 
    jokes.count() == 3 

    1 * threads.merge('getRandomJokes', *_) 
    1 * threads.merge('fromCallable', *_) 
    0 * threads.merge('retryWhen', *_) 
    1 * threads.merge('onNext', *_) 
    0 * threads.merge('onError', *_) 
    1 * threads.merge('onCompleted', *_) 
} 

Это терпит неудачу с:

Too few invocations for: 

1 * threads.merge('fromCallable', *_) (0 invocations) 
1 * threads.merge('onNext', *_) (0 invocations) 

Что я ожидаю, что fromCallable вызывается один раз, ей это удается, onNext вызывается один раз, а затем onCompleted. Что мне не хватает?

P.S .: Полное раскрытие информации - Я также разместил этот вопрос на RxJava GitHub.

ответ

1

Я решил это после нескольких часов устранения неполадок и с помощью члена ReactiveX Дэвида Карнка.

retryWhen является сложным, возможно, даже багги-оператором. В официальном документе и, по крайней мере, один ответ здесь используется оператор range, который немедленно завершается, если повторных попыток не требуется. Смотрите мой discussion с Дэвидом Карноком.

код доступен на мои GitHub комплекте со следующим тестовыми:

  1. преуспевает на 1-й попытку (без повторных попыток)
  2. завершается после 1 повторных попыток
  3. Попытки повторов 3 раза, но преуспевает второй, следовательно, не повторяет 3-й раз
  4. преуспевает на 3 повтора
+0

Так что и стало причиной того, что «странно поведение'? –

+0

@ YaroslavStavnichiy Вы можете перейти по ссылке в своем ответе для получения более подробной информации, но в кратком 'range' завершена сразу, таким образом пропустив' onNext' и перейдя прямо к 'onCompleted'. Как я уже сказал в своем ответе, с тех пор я переписал код, чтобы не использовать 'range'. –

+0

Я спросил, потому что я не видел этого в вашем ответе, возможно, вам следует его обновить. И почему именно «диапазон» завершился немедленно? Это потому, что 'numRetries' был установлен на ноль (вы забыли его установить)? –

Смежные вопросы