2016-10-25 2 views
0

У меня есть следующий фрагмент кода:Как упростить поток rxjava с таймаутов

Observable.just(1) 
    .flatMap(-> doSomething1) 
    .timeout(10, SECONDS) 
    .flatMap(-> doSomething2) 
    .timeout(10, SECONDS) 
    .flatMap(-> doSomething3) 
    .timeout(10, SECONDS) 
    .flatMap(-> doSomething4) 
    .timeout(10, SECONDS) 
    .subscribe(); 

Я не хочу повторяться после каждого flatMap добавления timeout. Моя первая мысль заключалась в том, чтобы применить только timeout к началу или концу потока, но это не то поведение, которое я намерен, поскольку оно применяет только тайм-аут к более близкому наблюдаемому.

Observable.just(1) 
    .flatMap(-> doSomething1) 
    .flatMap(-> doSomething2) 
    .flatMap(-> doSomething3) 
    .flatMap(-> doSomething4) 
    .timeout(10, SECONDS) 
    .subscribe(); 

Observable.just(1) 
    .timeout(10, SECONDS) 
    .flatMap(-> doSomething1) 
    .flatMap(-> doSomething2) 
    .flatMap(-> doSomething3) 
    .flatMap(-> doSomething4) 
    .subscribe(); 

doSomethingX The функции выполняют немного кода на вызов, который может занять некоторое время, прежде чем вернуться на следующий наблюдаемым, которая сама по себе не должна быть свернута в тайм-аут.

Как это можно улучшить?

ОБНОВЛЕНИЕ:

вывода более практический пример ниже. Идея заключается в создании потока, который я могу повторить в случае сбоя или таймаута. Я имитирую сценарий, когда один из операторов однажды отключается, но работает над повторением.

@Test 
public void streamToBeSimplified() throws Exception { 
    final AtomicBoolean retry = new AtomicBoolean(true); 

    Action1<Object> print = new Action1<Object>() { 
     @Override 
     public void call(Object o) { 
      System.out.println(" >>>" + o); 
     } 
    }; 

    Observable.just(1) 
      .doOnNext(print) 
      .flatMap(new Func1<Integer, Observable<Integer>>() { 
       @Override 
       public Observable<Integer> call(Integer integer) { 
        return Observable.just(2); 
       } 
      }) 
      .timeout(1, TimeUnit.SECONDS) 
      .doOnNext(print) 
      .flatMap(new Func1<Object, Observable<Integer>>() { 
       @Override 
       public Observable<Integer> call(Object o) { 

        if(retry.getAndSet(false)) { 
         try { 
          Thread.sleep(2000L); 
         } catch (InterruptedException e) { 
          e.printStackTrace(); 
         } 
        } 
        return Observable.just(3); 
       } 
      }) 
      .timeout(1, TimeUnit.SECONDS) 
      .doOnNext(print) 
      .retry(2) 
      .subscribe(); 

} 
+0

Является doSomethingX Наблюдаемое или метод-Call, как в doSomethingX()? –

+0

Я добавил лучший пример –

ответ

1

Вы можете создать вспомогательный метод как это:

private Observable doThings() { 
    return Observable.just(1) 
     .flatMap(__ -> withTimeout(doSomething1, 10, TimeUnit.SECONDS)) 
     .flatMap(__ -> withTimeout(doSomething2, 10, TimeUnit.SECONDS)); 
     // etc 
} 

private static <T> Observable<T> withTimeout(Observable<T> observable, long time, TimeUnit timeUnit) { 
    return observable 
      .timeout(time, timeUnit); 
} 
+0

Добавлен лучший пример. 'flatMap' фактически получает' Func1', поэтому там нет «Observable». Может быть, я должен переключиться так, как я думаю об этой композиции. –

Смежные вопросы