2016-05-19 8 views
4

У меня довольно стандартная проблема с разбивкой по страницам API, с которой вы можете справиться с некоторой простой рекурсией. Вот пример:Paginate Наблюдаемые результаты без рекурсии - RxJava

public Observable<List<Result>> scan() { 
    return scanPage(Optional.empty(), ImmutableList.of()); 
} 

private Observable<?> scanPage(Optional<KEY> startKey, List<Result> results) { 
    return this.scanner.scan(startKey, LIMIT) 
      .flatMap(page -> { 
       if (!page.getLastKey().isPresent()) { 
        return Observable.just(results); 
       } 
       return scanPage(page.getLastKey(), ImmutableList.<Result>builder() 
         .addAll(results) 
         .addAll(page.getResults()) 
         .build() 
       ); 
      }); 
} 

Но это, очевидно, может создать массивный столбец. Как я могу сделать это обязательно, но поддерживать поток Observable?

Вот императив блокировании пример:

public List<Result> scan() { 
    Optional<String> startKey = Optional.empty(); 
    final ImmutableList.Builder<Result> results = ImmutableList.builder(); 

    do { 
     final Page page = this.scanner.scan(startKey); 
     startKey = page.getLastKey(); 
     results.addAll(page.getResults()); 
    } while (startKey.isPresent()); 

    return results.build(); 
} 

ответ

1

Это не самый элегантный из решений, но вы можете использовать предметы и побочные эффекты. См. Пример игрушки ниже

import rx.Observable; 
import rx.Subscriber; 
import java.util.ArrayList; 
import java.util.List; 
import java.util.HashMap; 
import rx.subjects.*; 

public class Pagination { 
    static HashMap<String,ArrayList<String>> pages = new HashMap<String,ArrayList<String>>(); 

    public static void main(String[] args) throws InterruptedException { 
     pages.put("default", new ArrayList<String>()); 
     pages.put("2", new ArrayList<String>()); 
     pages.put("3", new ArrayList<String>()); 
     pages.put("4", new ArrayList<String>()); 

     pages.get("default").add("2"); 
     pages.get("default").add("Maths"); 
     pages.get("default").add("Chemistry"); 

     pages.get("2").add("3"); 
     pages.get("2").add("Physics"); 
     pages.get("2").add("Biology"); 

     pages.get("3").add("4"); 
     pages.get("3").add("Art"); 

     pages.get("4").add(""); 
     pages.get("4").add("Geography"); 



     Observable<List<String>> ret = Observable.defer(() -> 
     { 
      System.out.println("Building Observable"); 
      ReplaySubject<String> pagecontrol = ReplaySubject.<String>create(1); 
      Observable<List<String>> ret2 = pagecontrol.asObservable().concatMap(aKey -> 
      { 
       if (!aKey.equals("")) { 
        return Observable.just(pages.get(aKey)).doOnNext(page -> pagecontrol.onNext(page.get(0))); 
       } else { 
        return Observable.<List<String>>empty().doOnCompleted(()->pagecontrol.onCompleted()); 
       } 
      }); 
      pagecontrol.onNext("default"); 
      return ret2; 
     }); 
     // Use this if you want to ensure work isn't done again 
     ret = ret.cache(); 
     ret.subscribe(l -> System.out.println("Sub 1 : " + l)); 
     ret.subscribe(l -> System.out.println("Sub 2 : " + l)); 
     Thread.sleep(2000L); 
    } 
} 

Отредактировано с улучшениями.

+0

К сожалению, я не думаю, что будет работать. Результат будет первым результатом сканирования, так как это будет установлено, но поток основного потока выполнения обнаружит, что currentKey еще не установлен, и выйдите. –

+0

Похоже, вы правы. Проблема - это ключ. Если нет обычного способа его генерации, например. «1», «2» и т. Д., Тогда я думаю, что вы застряли с рекурсивным решением, которое имеет свои проблемы, если у вас слишком много страниц. – JohnWowUs

+0

См. Отредактированный ответ для хакерского решения с использованием темы и побочных эффектов. – JohnWowUs

1

Другого подход заключается в использовании маркера потока: получить данные для исходных маркеров, нажмите следующий маркер на поток сразу получают свежие данные дистанционных и повторно подписать, пока маркер не пуст

public Observable<Window> paging() { 

     Subject<Token, Token> tokenStream = BehaviorSubject.<Token>create().toSerialized(); 

     tokenStream.onNext(Token.startToken()); 

     Observable<Window> dataStream = 
       Observable.defer(() -> tokenStream.first().flatMap(this::remoteData)) 
         .doOnNext(window -> tokenStream.onNext(window.getToken())) 
         .repeatWhen(completed -> completed.flatMap(__ -> tokenStream).takeWhile(Token::hasMore)); 

     return dataStream; 
    } 

В результате

Window{next token=Token{key='1'}, data='data for token: Token{key=''}'} 
Window{next token=Token{key='2'}, data='data for token: Token{key='1'}'} 
Window{next token=Token{key='3'}, data='data for token: Token{key='2'}'} 
Window{next token=Token{key='4'}, data='data for token: Token{key='3'}'} 
Window{next token=Token{key='5'}, data='data for token: Token{key='4'}'} 
Window{next token=Token{key='6'}, data='data for token: Token{key='5'}'} 
Window{next token=Token{key='7'}, data='data for token: Token{key='6'}'} 
Window{next token=Token{key='8'}, data='data for token: Token{key='7'}'} 
Window{next token=Token{key='9'}, data='data for token: Token{key='8'}'} 
Window{next token=Token{key='10'}, data='data for token: Token{key='9'}'} 

Копия pastable образец

public class RxPaging { 

    public Observable<Window> paging() { 

     Subject<Token, Token> tokenStream = BehaviorSubject.<Token>create().toSerialized(); 

     tokenStream.onNext(Token.startToken()); 

     Observable<Window> dataStream = 
       Observable.defer(() -> tokenStream.first().flatMap(this::remoteData)) 
         .doOnNext(window -> tokenStream.onNext(window.getToken())) 
         .repeatWhen(completed -> completed.flatMap(__ -> tokenStream).takeWhile(Token::hasMore)); 

     return dataStream; 
    } 

    private Observable<Window> remoteData(Token token) { 
     /*limit number of pages*/ 
     int page = page(token); 
     Token nextToken = page < 10 
       ? nextPageToken(token) 
       : Token.endToken(); 

     return Observable 
       .just(new Window(nextToken, "data for token: " + token)) 
       .delay(100, TimeUnit.MILLISECONDS); 
    } 

    private int page(Token token) { 
     String key = token.getKey(); 
     return key.isEmpty() ? 0 : Integer.parseInt(key); 
    } 

    private Token nextPageToken(Token token) { 
     String tokenKey = token.getKey(); 
     return tokenKey.isEmpty() ? new Token("1") : nextToken(tokenKey); 
    } 

    private Token nextToken(String tokenKey) { 
     return new Token(String.valueOf(Integer.parseInt(tokenKey) + 1)); 
    } 

    public static class Token { 
     private final String key; 

     private Token(String key) { 
      this.key = key; 
     } 

     public static Token endToken() { 
      return startToken(); 
     } 

     public static Token startToken() { 
      return new Token(""); 
     } 

     public String getKey() { 
      return key; 
     } 

     public boolean hasMore() { 
      return !key.isEmpty(); 
     } 

     @Override 
     public String toString() { 
      return "Token{" + 
        "key='" + key + '\'' + 
        '}'; 
     } 
    } 


    public static class Window { 
     private final Token token; 
     private final String data; 

     public Window(Token token, String data) { 
      this.token = token; 
      this.data = data; 
     } 

     public Token getToken() { 
      return token; 
     } 

     public String getData() { 
      return data; 
     } 

     @Override 
     public String toString() { 
      return "Window{" + 
        "next token=" + token + 
        ", data='" + data + '\'' + 
        '}'; 
     } 
    } 

    @Test 
    public void testPaging() throws Exception { 
     paging().toBlocking().subscribe(System.out::println); 
    } 
} 
+0

Мне это нравится. Я не пробовал реализовать его, но мне интересно, как это будет сравниваться с использованием темы в решении JohnWowUs? –

+0

Оба используют поток токенов хелпера - разница в том, как прерывается поток данных «round-robin». Что касается предмета - в решении Джона используется неограниченный объект повтора, который может или не может быть проблемой, зависит от подсчета количества страниц. Его можно безопасно заменить BehaviorSubject (я думаю, что он сделал опечатку и использовал ReplaySubject.create (1) вместо ReplaySubject.createWithSize (1) - который идентичен BehaviorSubject) –

-4

Просто идея, почему Wouldnt вам реализовать свой собственный итеративный, который повторяется над вашими страницами, а затем сделать из него наблюдателя?

Пример:

Observable.from(new Iterable<T>() { 

     @Override 
     public Iterator<T> iterator() { 
      return new Iterator<T>() { 
       @Override 
       public boolean hasNext() { 
        return hasNextPage(currentPageKey); 
       } 

       @Override 
       public T next() { 

        page = getNextPage(currentPageKey); 
        currentPageKey = page.getKey(); 
        return page; 
       } 

       @Override 
       public void remove() { 
        throw new UnsupportedOperationException(); 
       } 
      }; 
     } 
    }); 

более элегантный способ будет сделать вашу страницу менеджер (переменным сканер в вашем примере кода я верю) осуществлять Iterable и писать логику итерации там.

+0

Это не работает, поскольку вы просто понимаете синхронный Итерируемый и обертывающий его в Observable. На самом деле проблема в том, что «getNextPage» возвращает Observable, поэтому Iterator никогда не узнает, имеет ли он hasNext. –

1

Ответ от JohnWowUs велик и помог мне понять, как эффективно избежать рекурсии, но были некоторые моменты, которые я все еще путал, поэтому я публикую свою измененную версию.

Резюме:

  • Отдельные страницы возвращаются в Single.
  • Используйте Flowable для потоковой передачи каждого из элементов, содержащихся на страницах. Это означает, что абоненты нашей функции не должны знать об отдельных страницах и могут просто собирать содержащиеся элементы.
  • Используйте BehaviorProcessor для начала первой страницы и выберите каждую последующую страницу после того, как мы проверили текущую страницу, если доступна следующая страница.
  • Ключ в том, что вызов processor.onNext(int) начинается с следующей итерации.

Этот код зависит от rxjava и reactive-streams.

import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.List; 
import java.util.Optional; 
import java.util.function.Function; 

import io.reactivex.Flowable; 
import io.reactivex.Single; 
import io.reactivex.processors.BehaviorProcessor; 

public class Pagination { 

    // Fetch all pages and return the items contained in those pages, using the provided page fetcher function 
    public static <T> Flowable<T> fetchItems(Function<Integer, Single<Page<T>>> fetchPage) { 
     // Processor issues page indices 
     BehaviorProcessor<Integer> processor = BehaviorProcessor.createDefault(0); 
     // When an index number is issued, fetch the corresponding page 
     return processor.concatMap(index -> fetchPage.apply(index).toFlowable()) 
         // when returning the page, update the processor to get the next page (or stop) 
         .doOnNext(page -> { 
          if (page.hasNext()) { 
           processor.onNext(page.getNextPageIndex()); 
          } else { 
           processor.onComplete(); 
          } 
         }) 
         .concatMapIterable(Page::getElements); 
    } 

    public static void main(String[] args) { 
     fetchItems(Pagination::examplePageFetcher).subscribe(System.out::println); 
    } 

    // A function to fetch a page of our paged data 
    private static Single<Page<String>> examplePageFetcher(int index) { 
     return Single.just(pages.get(index)); 
    } 

    // Create some paged data 
    private static ArrayList<Page<String>> pages = new ArrayList<>(3); 

    static { 
     pages.add(new Page<>(Arrays.asList("one", "two"), Optional.of(1))); 
     pages.add(new Page<>(Arrays.asList("three", "four"), Optional.of(2))); 
     pages.add(new Page<>(Arrays.asList("five"), Optional.empty())); 
    } 

    static class Page<T> { 
     private List<T> elements; 
     private Optional<Integer> nextPageIndex; 

     public Page(List<T> elements, Optional<Integer> nextPageIndex) { 
      this.elements = elements; 
      this.nextPageIndex = nextPageIndex; 
     } 

     public List<T> getElements() { 
      return elements; 
     } 

     public int getNextPageIndex() { 
      return nextPageIndex.get(); 
     } 

     public boolean hasNext() { 
      return nextPageIndex.isPresent(); 
     } 
    } 
} 

Выход:

one 
two 
three 
four 
five 
Смежные вопросы