2015-08-12 2 views
3

Я пытаюсь создать бесконечную прокрутку в своем приложении для Android с использованием противодавления в rx Java. Я хочу, чтобы он вызывал внешнее обслуживание только запрашиваемое количество раз (после вызова request(1)). Но после использования плоской карты каждый subscribe загружает 16 страниц.RxJava Противодавление и количество звонков продюсеру

ниже моего кода с ожидаемыми результатами. Почти каждый тест терпит неудачу из первого запроса (п = 16)

import org.junit.Before; 
import org.junit.Test; 
import org.mockito.Mockito; 
import rx.Observable; 
import rx.observers.TestSubscriber; 

import java.util.Arrays; 
import java.util.List; 
import java.util.concurrent.atomic.AtomicInteger; 

import static java.util.Collections.emptyList; 
import static org.mockito.Mockito.*; 
import static rx.internal.util.UtilityFunctions.identity; 

public class ServiceObservablesTest { 


    public static <T> Observable<List<T>> create(DataProvider<T> dataProvider) { 
     Observable<Observable<List<T>>> metaObservalble = Observable.create(subscriber -> { 
      AtomicInteger pageNumber = new AtomicInteger(); 
      subscriber.setProducer(n -> { 
       // at subscribe rxJava makes request for 16 elements - probably because of flatMap 
       // after first request with 16 elements everything seems to work fine even if i ignore the 'n' param 

       Observable<List<T>> page = dataProvider.requestPage(pageNumber.getAndIncrement()); 
       subscriber.onNext(page); 

      }); 
     }); 
     return metaObservalble.flatMap(identity()).takeWhile(page -> !page.isEmpty()); 
    } 

    public interface DataProvider<T> { 
     Observable<List<T>> requestPage(int page); 
    } 


    private DataProvider provider; 

    @Before 
    public void setUp() throws Exception { 
     provider = Mockito.mock(DataProvider.class); 
     List<Object> list = Arrays.asList(new Object()); 
     when(provider.requestPage(anyInt())).thenReturn(Observable.just(list)); 
    } 

    @Test 
    public void shouldRequestOnlyFirstPageOnSubscribe() { 
     //given 

     TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(1); 
     Observable<List<Object>> flightsObservable = create(provider); 

     //when 
     flightsObservable.subscribe(subscriber); 

     //then 
     subscriber.assertValueCount(1); 
     subscriber.assertNotCompleted(); 

     verify(provider, times(1)).requestPage(0); 
     verify(provider, never()).requestPage(1); 
    } 


    @Test 
    public void shouldRequestNumberOfPagesSpecified() { 
     //given 

     int requested_pages = 5; 
     TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(0); 
     Observable<List<Object>> flightsObservable = create(provider); 

     //when 
     flightsObservable.subscribe(subscriber); 
     subscriber.requestMore(requested_pages); 

     //then 
     subscriber.assertValueCount(requested_pages); 
     subscriber.assertNotCompleted(); 


     for (int i = 0; i < requested_pages; i++) { 
      verify(provider, times(1)).requestPage(i); 
     } 
     verify(provider, never()).requestPage(requested_pages); 

    } 


    @Test 
    public void shouldCompleteAfterRetrievingEmptyResult() { 
     //given 

     int emptyPage = 2; 
     when(provider.requestPage(emptyPage)).thenReturn(Observable.just(emptyList())); 

     TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(100); 
     Observable<List<Object>> flightsObservable = create(provider); 


     //when 
     flightsObservable.subscribe(subscriber); 

     //then 
     subscriber.assertValueCount(emptyPage); 
     subscriber.assertCompleted(); 


     verify(provider, times(1)).requestPage(0); //requested at subscribe 
     for (int i = 1; i <= emptyPage; i++) { 
      verify(provider, times(1)).requestPage(i); 
     } 
     verify(provider, never()).requestPage(emptyPage + 1); 

    } 

    @Test 
    public void shouldRequestNextPageWhenRequestedMore() { 
     //given 

     TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(1); 
     Observable<List<Object>> flightsObservable = create(provider); 

     //when 
     flightsObservable.subscribe(subscriber); 
     subscriber.requestMore(1); 

     //then 
     subscriber.assertValueCount(2); 
     verify(provider, times(1)).requestPage(0); 
     verify(provider, times(1)).requestPage(1); 
     verify(provider, never()).requestPage(2); 

     //when 
     subscriber.requestMore(1); 

     //then 
     subscriber.assertValueCount(3); 
     subscriber.assertNotCompleted(); 

     verify(provider, times(1)).requestPage(0); 
     verify(provider, times(1)).requestPage(1); 
     verify(provider, times(1)).requestPage(2); 
     verify(provider, never()).requestPage(3); 

    } 

    @Test 
    public void shouldWorkWithMultipleSubscribers() { 

     //given 

     TestSubscriber<List<Object>> subscriber1 = new TestSubscriber<>(1); 
     TestSubscriber<List<Object>> subscriber2 = new TestSubscriber<>(1); 
     Observable<List<Object>> flightsObservable = create(provider); 

     //when 
     flightsObservable.subscribe(subscriber1); 
     flightsObservable.subscribe(subscriber2); 

     //then 
     subscriber1.assertValueCount(1); 
     subscriber2.assertValueCount(1); 

     verify(provider, times(2)).requestPage(0); 
     verify(provider, never()).requestPage(1); 

     //when 
     subscriber1.requestMore(1); 
     //then 
     subscriber1.assertValueCount(2); 
     subscriber2.assertValueCount(1); 

     verify(provider, times(2)).requestPage(0); 
     verify(provider, times(1)).requestPage(1); 
     verify(provider, never()).requestPage(2); 

     //when 
     subscriber2.requestMore(1); 
     //then 
     subscriber1.assertValueCount(2); 
     subscriber2.assertValueCount(2); 

     verify(provider, times(2)).requestPage(0); 
     verify(provider, times(2)).requestPage(1); 
     verify(provider, never()).requestPage(2); 
    } 

} 
+0

Будет 'DataProvider.requestPage (интермедиат)' график работы, используя нить, будущее, 'observeOn (Scheduler)' или 'subscribeOn (Scheduler)'? – Aaron

+0

да, я использую 'subscribeOn' и' observOn' –

+0

Ваш metaObservable неправильно выполняет противодавление и не завершается. По умолчанию flatMap/merge подписывается только на 16 наблюдаемых одновременно и не будет двигаться дальше, если они не закончатся. – akarnokd

ответ

3

давления Назад предназначено для ведения переговоров параллельного поведения производителя потребительского и позволить автору программы установить стратегии решения, что делать, когда скорость данных производится превышает скорость потребления данных.

При этом вы увидите, что операторы, объединяющие наблюдаемые данные, такие как merge, предоставят запрашиваемую сумму, которая не соответствует требуемому количеству данных. Внешнее наблюдаемое (Наблюдаемое Наблюдение) всегда будет получать запрос на 16 на RxAndroid (128 в RxJava) при слиянии. Затем, когда он получает внутренние Observables of List, каждый внутренний наблюдаемый получит запрос, основанный на запрашиваемой сумме от подписчика вниз по течению. Если вы попытаетесь написать Observable<Observable<T>>, вы будете вынуждены написать функцию OnSubscribe<Observable<List<T>>>, которая внутренне управляет слиянием, чтобы она была Observable<List<T>> вместо Observable<Observable<List<T>>. Написание этого заставит вас подписаться на наблюдаемый, возвращаемый вашим поставщиком данных, чтобы разворачивать и на следующем. List<T>.

Я предлагаю вам вместо этого отображать экранные позиции y в события конца страницы, а затем использовать сканирование, чтобы преобразовать его в монотонно увеличивающееся число, а затем concatMap это число в вызов DataProvider.requestPage().

screenYPositions 
    .map(this::isUninitializedOrNearEndOfPage) 
    .scan(1, (event, pageNumber) -> pageNumber + 1) 
    .concatMap(dataProvider::requestPage) 
    .subscribe(testSubscriber); 
+0

на Android Я считаю, что размер буфера по умолчанию - 16, а не 128 –

+0

А это была моя теория. Спасибо Дэйву! – Aaron

+0

Итак, слияние предварительно загружает данные, и нет способа контролировать, сколько раз будет вызываться производитель? –

Смежные вопросы