2016-10-28 6 views
1

Существует метод updateFromRemote():RxJava ждать булевой (условие)

public class WorkshiftSettingsDaoImpl implements WorkshiftSettingsDao { 

    private boolean isUpdating = false; 

    public Observable<WorkshiftSettings> updateFromRemote() { 
     return remoteDataStore.get() 
       .retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay) 
       .doOnSubscribe(this::setUpdatingStarted) 
       .doOnUnsubscribe(this::setUpdatingFinished) 
       .flatMap(workshiftSettings -> localDataStore.put(workshiftSettings)); 
    } 

    private void setUpdatingStarted() { 
     if(isUpdating) throw new RuntimeException("already updating"); 
     isUpdating = true; 
    } 

    private void setUpdatingFinished() { 
     if(!isUpdating) throw new RuntimeException("already finished"); 
     isUpdating = false; 
    } 

} 

Как можно реализовать такое поведение:

если isUpdating == true затем ждать, пока он не будет изменен на false и выполнить представленную цепочку.

если isUpdating == false, то просто выполните представленную цепочку.

Существует мое решение:

public Observable<WorkshiftSettings> updateFromRemote() { 
    Observable<WorkshiftSettings> updateRemoveDataObservable = remoteDataStore.get() 
      .retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay) 
      .doOnSubscribe(this::setUpdatingStarted) 
      .doOnUnsubscribe(this::setUpdatingFinished) 
      .flatMap(workshiftSettings -> localDataStore.put(workshiftSettings)); 

    return Observable.fromCallable(() -> { 
     while (isUpdating) { 
      Thread.sleep(1000); 
     } 
     return null; 
    }).concatMap(o -> updateRemoveDataObservable); 
} 

Но я думаю, что есть что-то не так :)

Любая лучшая идея?

+0

ли это жесткое требование, чтобы вы действовали на изменение isUpdating, или можно добавить метод setUpdating? Если вы можете вызвать «onNext» на объект «Subject» из setUpdating и объединить тему с вашим наблюдаемым. –

+0

'isUpdating' изменен' setUpdatingStarted() '/' setUpdatingFinished() '. Я редактировал свой вопрос, чтобы включить их. – Alexandr

ответ

3

Не уверен, я правильно понял ваш поток. Но я думаю, у вас есть «место», где isUpdating изменено. Если да, то вы можете просто создать final BehaviorSubject<Boolean> isUpdatingSubject = BehaviorSubject.<Boolean>create() и в этом «месте», вместо того чтобы изменить логическую переменную, вы делаете: isUpdatingSubject.onNext(false)

И

public Observable<WorkshiftSettings> updateFromRemote() { 
    return isUpdatingSubject 
      .distinctUntilChanged() 
      .filter(new Func1<Boolean, Boolean>() { 
       @Override 
       public Boolean call(BookingErrorActivity isUpdating) { 
        return !isUpdating; 
       } 
      }) 
      .flatMap(new Func1<Object, Observable<WorkshiftSettings>>() { 
       @Override 
       public Observable<WorkshiftSettings> call(Object o) { 
        return remoteDataStore.get() 
          .retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay) 
          .doOnSubscribe(this::setUpdatingStarted) 
          .doOnUnsubscribe(this::setUpdatingFinished) 
          .flatMap(workshiftSettings -> localDataStore.put(workshiftSettings)); 
       } 
      }); 

} 

Но быть осторожность,: с таким раствором этого наблюдаемого будет излучать каждый раз, когда isUpdatingSubject испускает true, поэтому иногда это может быть не круто. Чтобы предотвратить от него вы можете использовать .first() перед тем flatMap()

PS Вы можете прочитать больше о BehaviorSubject

PSS К сожалению, я использовал синтаксис java7

+0

Спасибо за ответ, но я имел в виду что-то еще. 'isUpdating' изменено самим' WorkshiftSettingsDaoImpl'. См. Обновленный вопрос. – Alexandr

+0

Хорошо, я думаю, вам нужно '.debounce (1000, TimeUnit.MILLISECONDS, uiScheduler)' –

+1

Хорошо, мне кажется, вам нужно что-то вроде: .debounce (1000, TimeUnit.MILLISECONDS, uiScheduler) .withLatestFrom (isUpdatingSubject, [secondParam]) .filter ([isFalse]) ' –

Смежные вопросы