2015-04-28 13 views
4

Я пытаюсь реализовать этот рабочий процесс с помощью rxJava, но я уверен, что если я злоупотребляю или делаю неправильно.Обработка кэша с помощью RXJava

  • Пользователь просит войти
  • Если loginResult доступна в кэше, то «выбрасывают», сохраненную в кэше LoginResult
  • Else фактически выполняет запрос к веб-сервиса и кэшировать результат, если все успешна
  • Если ошибка повторяет попытку не более 3 раз, и если есть 4-й раз, то очистите кеш.

Вот мой полный фрагмент кода.

public class LoginTask extends BaseBackground<LoginResult> { 
    private static CachedLoginResult cachedLoginResult = new CachedLoginResult(); 
    private XMLRPCClient xmlrpcClient; 
    private UserCredentialsHolder userCredentialsHolder; 

    @Inject 
    public LoginTask(XMLRPCClient client, UserCredentialsHolder userCredentialsHolder) { 
    this.xmlrpcClient = client; 
    this.userCredentialsHolder = userCredentialsHolder; 
    } 

    @Override 
    public LoginResult performRequest() throws Exception { 
    return UserApi.login(
     xmlrpcClient, 
     userCredentialsHolder.getUserName(), 
     userCredentialsHolder.getPlainPassword()); 


    } 

    @Override 
    public Observable<LoginResult> getObservable() { 
    return cachedLoginResult.getObservable() 
     .onErrorResumeNext(
      Observable.create(
       ((Observable.OnSubscribe<LoginResult>) subscriber -> { 
        try { 
        if (!subscriber.isUnsubscribed()) { 
         subscriber.onNext(performRequest()); // actually performRequest 
        } 
        subscriber.onCompleted(); 
        } catch (Exception e) { 
        subscriber.onError(e); 
        } 
       }) 
      ) 
       .doOnNext(cachedLoginResult::setLoginResult) 
       .retry((attempts, t) -> attempts < 3) 
       .doOnError(throwable -> cachedLoginResult.purgeCache()) 
     ); 
    } 


    private static class CachedLoginResult { 
    private LoginResult lr = null; 
    private long when = 0; 

    private CachedLoginResult() { 
    } 

    public boolean hasCache() { 
     return lr != null && when + TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES) > System.currentTimeMillis(); 
    } 

    public void setLoginResult(LoginResult lr) { 
     if (lr != null) { 
      this.lr = lr; 
      this.when = System.currentTimeMillis(); 
     } 
    } 

    public void purgeCache() { 
     this.lr = null; 
     this.when = 0; 
    } 

    public Observable<LoginResult> getObservable() { 
     return Observable.create(new Observable.OnSubscribe<LoginResult>() { 
     @Override 
     public void call(Subscriber<? super LoginResult> subscriber) { 
      if (!subscriber.isUnsubscribed()) { 
      if (hasCache()) { 
       subscriber.onNext(lr); 
       subscriber.onCompleted(); 
      } else { 
       subscriber.onError(new RuntimeException("No cache")); 
      } 
      } 
     } 
     }); 
    } 
    } 
} 

Поскольку я wan't смог найти подобные примеры и я начал «играть» с rxjava только 1 день назад я не уверен в моей реализации.

Спасибо за ваше время.

ответ

2

Я думаю, что этот код хорошо, хорошая работа :)

Вы были правы использовать Observable.create в вашем LoginTask, потому что в противном случае результат вызова может быть кэшируются, а затем retry не помогло бы много ...

Это, я думаю, не является необходимым для CachedLoginResultObservable. Здесь вы можете упростить код с помощью Observable.just и Observable.error вспомогательных методов, что-то вроде:

public Observable<LoginResult> getObservable() { 
    if (hasCache()) { 
     return Observable.just(lr); 
    } else { 
     return Observable.error(new RuntimeException("No cache")); 
    } 
} 

Примечания: just сохраняет значение вы сказать ему, чтобы излучать внутренне, так что resubscriptions всегда будет производить это значение. Это то, что я намекнул выше, вы не должны делать Observable.just(performRequest()).retry(3), например, потому что performRequest будет когда-либо вызываться один раз.

+0

Здравствуй Саймон, правильно, если «м неправильно, но с помощью Анжелеса и .error заставит значение испускаются при наблюдаемом создании. Что произойдет, если я создам основное наблюдаемое и использую его после истечения срока действия кэша? Думаю, он просто вернет мне 30-минутный кеш, который должен быть истек уже сейчас? –

+0

Вы правы, просто захватите значение кеша в тот момент, когда вы вызываете getObservable(), поэтому «Observable.create» или «Observable.defer» будет иметь смысл. Также посмотрите на ответ Акарнокда;) –

0

Если я правильно понял, вы хотите выполнить логин один раз и кэшировать результат реактивным способом? Если да, то вот пример того, как я хотел бы сделать это:

import java.util.concurrent.ThreadLocalRandom; 

import rx.*; 
import rx.schedulers.Schedulers; 
import rx.subjects.AsyncSubject; 


public class CachingLogin { 
    static class LoginResult { 

    } 
    /** Guarded by this. */ 
    AsyncSubject<LoginResult> cache; 
    public Observable<LoginResult> login(String username, String password) { 
     AsyncSubject<LoginResult> c; 
     boolean doLogin = false; 
     synchronized (this) { 
      if (cache == null || cache.hasThrowable()) { 
       cache = AsyncSubject.create(); 
       doLogin = true; 
      } 
      c = cache; 
     } 
     if (doLogin) { 
      Observable.just(1).subscribeOn(Schedulers.io()) 
      .map(v -> loginAPI(username, password)) 
      .retry(3).subscribe(c); 
     } 
     return c; 
    } 
    public void purgeCache() { 
     synchronized (this) { 
      cache = null; 
     } 
    } 
    static LoginResult loginAPI(String username, String password) { 
     if (ThreadLocalRandom.current().nextDouble() < 0.3) { 
      throw new RuntimeException("Failed"); 
     } 
     return new LoginResult(); 
    } 
} 
+0

Здравствуйте, akarnokd, благодарю вас за ответ. Это похоже на хороший способ сделать что-то в rxjava :) ЕСЛИ я правильно его понимаю, когда вы вызываете свой метод «входа», он немедленно вызывает loginAPI() (если кеш отсутствует). Хотя это может иметь смысл в большинстве сценариев, я хотел бы получить наблюдаемый, который пинает настоящий HTTP-вызов, чтобы я мог использовать observOn и subscribeOn в зависимости от варианта использования. –

Смежные вопросы