2015-11-12 5 views
1

Я довольно новичок в rxJava. в моем ответе API, получить информацию об общем количестве страниц и номер текущей страницы, как:RxJava Pagination

"pages": 22, 
"page": 1, 

я использую Дооснащение сделать API вызовы в уровне данных, моя служба апи, как:

@GET("/story") 
    Observable <StoryCollectionEntity> storyCollection(
       @Query("feed_ids") String feed_items, 
       @Query("page") int page); 

затем:

public Observable<StoryCollectionEntity> storyCollection() { 
     return mUserApi.storyCollection(items,page); 
} 

я сделал подписку в области слоя, как это:

public void execute(Subscriber UseCaseSubscriber) { 
     this.subscription = this.buildUseCaseObservable() 
       .subscribeOn(Schedulers.from(threadExecutor)) 
       .observeOn(postExecutionThread.getScheduler()) 
       .subscribe(UseCaseSubscriber); 

    } 
@Override public Observable buildUseCaseObservable() { 
    return this.userRepository.stories(); 
    } 

Я выясняю, как я могу заставить наблюдателя отреагировать на событие scycling recyclerView, испустив результат следующей страницы. i.e стр. 2 к второму прокручивающему событию и стр. 3 на третьей прокрутке ... и т. д.

+0

Есть много примеров того, как сделать разбиение на страницы на recyclerView с помощью rx-java, просто введите его в Google – Than

ответ

3

Я вставлю ниже используемое мной решение, которое также использует Retrofit. В приведенном ниже примере я использую Retrofit для создания Observable всех общедоступных хранилищ из API GitHub.

Я считаю, что трюком здесь является то, что я рекурсиями на Наблюдаемом и конкатенации с другим Observable по существу триггера.

Этот триггер может излучать множество способов (возможно, когда пользователь прокручивает страницу вниз), однако в этом примере он сразу же испускает один раз, когда обрабатывается полная страница.

/*** 
* This is a helper wrapper Subscriber that helps you lazily defer 
* continuous paging of a result set from some API. 
* Through the use of a {@link Subject}, it helps notify the original {@link Observable} 
* when to perform an additional fetch. 
* The notification is sent when a certain count of items has been reached. 
* Generally this count represents the page. 
* @param <T> The event type 
*/ 
@Data 
public class PagingSubscriber<T> extends Subscriber<T> { 

    private final Subject<Void,Void> nextPageTrigger = PublishSubject.create(); 
    private final long pageSize; 
    private long count = 0; 
    private final Subscriber<T> delegate; 

    /*** 
    * Creates the {@link PagingSubscriber} 
    * @param pageSize 
    * @param delegate 
    */ 
    public PagingSubscriber(long pageSize, Subscriber<T> delegate) { 
     this.pageSize = pageSize; 
     this.delegate = delegate; 
    } 

    public Observable<Void> getNextPageTrigger() { 
     return nextPageTrigger; 
    } 

    @Override 
    public void onStart() { 
     delegate.onStart(); 
    } 

    @Override 
    public void onCompleted() { 
     delegate.onCompleted(); 
    } 

    @Override 
    public void onError(Throwable e) { 
     delegate.onError(e); 
    } 

    @Override 
    public void onNext(T t) { 
     count+=1; 
     if (count == pageSize) { 
      nextPageTrigger.onNext(null); 
      count= 0; 
     } 
     delegate.onNext(t); 
    } 

} 

@Data 
public class GitHubRepositoryApplication { 

    private final GitHubService gitHubService; 

    @Inject 
    public GitHubRepositoryApplication(GitHubService githubService) { 
     this.gitHubService = githubService; 
    } 

    public Observable<GitHubRepository> printAllRepositories(Observable<Void> nextPageTrigger) { 
     return printRepositoryPages(GitHubService.FIRST_PAGE, nextPageTrigger) 
       .flatMapIterable(r -> r.body()); 
    } 


    public Observable<Response<List<GitHubRepository>>> printRepositoryPages(String startingPage, Observable<Void> nextPageTrigger) { 
     return gitHubService.listRepos(startingPage) 
       .concatMap(response -> { 
        Optional<String> nextPage = Optional.ofNullable(response.headers().get(HttpHeaders.LINK)) 
          .flatMap(header -> GitHubServiceUtils.getNextToken(header)); 

        if (!nextPage.isPresent()) { 
         return Observable.just(response); 
        } 
        return Observable.just(response) 
          .concatWith(nextPageTrigger.limit(1).ignoreElements().cast(Response.class)) 
          .concatWith(printRepositoryPages(nextPage.get(), nextPageTrigger)); 
       }); 
    } 

    public static void main(String[] args) { 
     Injector injector = Guice.createInjector(new GitHubModule()); 

     GitHubRepositoryApplication app = injector.getInstance(GitHubRepositoryApplication.class); 

     Subscriber<GitHubRepository> subscriber = new Subscriber<GitHubRepository>() { 

      private final Logger log = LoggerFactory.getLogger(getClass()); 

      @Override 
      public void onStart() { 
       log.debug("STARTING"); 
       request(1l);//we need to begin the request 
      } 

      @Override 
      public void onCompleted() { 
       log.debug("COMPLETED"); 
      } 

      @Override 
      public void onError(Throwable e) { 
       log.error("ERROR",e); 
      } 

      @Override 
      public void onNext(GitHubRepository gitHubRepository) { 
       log.debug("{}",gitHubRepository); 
       request(1l);//we need to make sure we have asked for another element 
      } 
     }; 

     PagingSubscriber<GitHubRepository> pagingSubscriber = new PagingSubscriber<>(GitHubService.PAGE_SIZE, subscriber); 

     //In order for the JVM not to quit out, we make sure we turn our Observable to 
     //a BlockingObservable, so that all of it will finish. 
     Observable<GitHubRepository> observable = 
       app.printAllRepositories(pagingSubscriber.getNextPageTrigger()); 
     observable.toBlocking().subscribe(pagingSubscriber); 

    } 

}