2016-04-29 3 views
3

Сейчас я реализую некоторую логику опроса с помощью RxJava. Я должен опросить конечную точку несколько раз, пока она не сообщит мне остановиться. Кроме того, каждый ответ возвращается со временем, когда я должен задерживаться до опроса. Моя логика выглядит что-то вроде этого прямо сейчас:Значение динамической задержки с помощью repeatWhen()

service.pollEndpoint() 
    .repeatWhen(observable -> observable.delay(5000, TimeUnit.MILLISECONDS)) 
    .takeUntil(Blah::shouldStopPolling); 

Сейчас у меня есть значение задержки зашиты до 5000, но я хотел бы, чтобы зависеть от величины в ответ на опрос. Я попробовал использовать плоскую карту, которая вернула Observable.just(pollResponse).repeatWhen(observable -> observable.delay(pollResponse.getDelay(), TimeUnit.MILLISECONDS)), но это не казалось правильной идеей, так как она перепуталась с источником Observable. Я чувствую, что это что-то простое, что я пропускаю. Благодаря!

ответ

0

Вы можете использовать побочный эффект оператора doOnNext обновить переменную задержки, а затем использовать это в вашем repeatWhen

int pollDelay = 5000; 

service.pollEndpoint() 
.doOnNext(pollResponse -> pollDelay=pollResponse.getDelay()) 
.repeatWhen(observable -> observable.delay(pollDelay, TimeUnit.MILLISECONDS)) 
.takeUntil(Blah::shouldStopPolling); 
+0

В идеале я хотел бы там не должно быть никаких побочных эффектов. :/ –

+0

Единственный способ, которым я вижу, это сделать, чтобы написать свой собственный оператор стиля повтора. – JohnWowUs

2

Как уже упоминалось @JohnWowUs, вам необходимо вне зоны связи, но если вы подписались на последовательность более чем один раз, вы можете использовать defer иметь для каждого абонента состояние:

Observable.defer(() -> { 
    int[] pollDelay = { 0 }; 
    return service.pollEndpoint() 
    .doOnNext(response -> pollDelay[0] = response.getDelay()) 
    .repeatWhen(o -> o.flatMap(v -> Observable.timer(pollDelay[0], MILLISECONDS))) 
    .takeUntil(Blah::shouldStopPolling); 
}); 
+0

Не могли бы вы взглянуть на решение, которое я разместил? Используя рекурсию, нет необходимости в внеполосной связи, но я не уверен, что это привело к возникновению каких-либо проблем, которые я пропустил. –

0

Это решение, которое я закончил с использованием:

public static Observable<PollResponse> createPollObservable(RetrofitService service, PollResponse response) { 
    return Blah::shouldStopPolling(response) 
     ? Observable.empty() 
     : service 
      .pollEndpoint() 
      .delaySubscription(getPollDelay(response), TimeUnit.MILLISECONDS) 
      .concatMap(response1 -> createPollObservable(service, response1) 
        .startWith(response1) 
        .takeUntil(Blah::shouldStopPolling) 
      ); 
} 

Вместо этого он использует рекурсию, чтобы всегда иметь последний объект PollResponse, а также переключается на delaySubscription(), а не repeatWhen().

+0

@akarnokd Это решение, которое я использую прямо сейчас, и, похоже, он отлично работает без какого-либо внеполосного общения. Все в порядке? –

0

Вы могли злоупотребление retryWhen - но я просто говорю, что это возможно, не то, что вы должны сделать:

package com.example.retrywhen; 

import com.example.LoggingAction1; 

import org.pcollections.PVector; 
import org.pcollections.TreePVector; 

import java.util.concurrent.TimeUnit; 
import java.util.concurrent.atomic.AtomicInteger; 

import rx.Observable; 
import rx.functions.Action0; 
import rx.functions.Func0; 
import rx.functions.Func1; 
import rx.schedulers.Schedulers; 

import static com.example.Utils.sleep; 

public class RetryWhenDynamicDelayTest { 

    final static PVector<Integer> delays = TreePVector.<Integer>singleton(500).plus(1_000).plus(2_000); 

    final static AtomicInteger count = new AtomicInteger(0); 

    final static Observable<Integer> willCycleThroughTheList = Observable.defer(new Func0<Observable<Integer>>() { 
     @Override 
     public Observable<Integer> call() { 
      return Observable.just(delays.get(count.getAndIncrement() % 3)); 
     } 
    }); 

    static class ThisIsNotReallyAnException extends Throwable { 
     final Integer integer; 

     ThisIsNotReallyAnException(Integer integer) { 
      this.integer = integer; 
     } 
    } 


    public static void main(String[] args) { 

     final long now = System.currentTimeMillis(); 

     willCycleThroughTheList.flatMap(new Func1<Integer, Observable<?>>() { 
      @Override 
      public Observable<?> call(Integer integer) { 
       return Observable.error(new ThisIsNotReallyAnException(integer)); 
      } 
     }) 
       .doOnUnsubscribe(new Action0() { 
        @Override 
        public void call() { 
         System.out.println("Millis since start: " + (System.currentTimeMillis() - now)); 
        } 
       }) 
       .onErrorResumeNext(new Func1<Throwable, Observable<Integer>>() { 
        @Override 
        public Observable<Integer> call(Throwable throwable) { 
         if (throwable instanceof ThisIsNotReallyAnException) { 
          ThisIsNotReallyAnException thisIsNotReallyAnException = (ThisIsNotReallyAnException) throwable; 
          return Observable.just((thisIsNotReallyAnException.integer)).concatWith(Observable.error(throwable)); 
         } else { 
          return Observable.error(throwable); 
         } 
        } 
       }) 
       .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() { 
        @Override 
        public Observable<?> call(Observable<? extends Throwable> observable) { 
         return observable.flatMap(new Func1<Throwable, Observable<?>>() { 
          @Override 
          public Observable<?> call(Throwable throwable) { 
           if (throwable instanceof ThisIsNotReallyAnException) { 
            ThisIsNotReallyAnException thisIsNotReallyAnException = (ThisIsNotReallyAnException) throwable; 
            return Observable.timer(thisIsNotReallyAnException.integer, TimeUnit.MILLISECONDS); 
           } else { 
            return Observable.error(throwable); 
           } 
          } 
         }); 
        } 
       }) 
       .subscribeOn(Schedulers.io()) 
       .subscribe(new LoggingAction1<Object>("")); 

     sleep(10_000); 
    } 
} 

Печать:

Millis since start: 75 
call(): 500 
Millis since start: 590 
call(): 1000 
Millis since start: 1591 
call(): 2000 
Millis since start: 3593 
call(): 500 
Millis since start: 4094 
call(): 1000 
Millis since start: 5095 
call(): 2000 
Millis since start: 7096 
call(): 500 
Millis since start: 7597 
call(): 1000 
Millis since start: 8598 
call(): 2000 
Смежные вопросы