2015-05-22 2 views
2

У меня есть вариант использования, где я хочу многократно вызывать веб-API для каждого пользователя, пока не будут загружены все данные.Вызовите API повторно, пока не будут загружены все данные

Веб-API, который у меня есть, позволяет получить максимум 100 записей для пользователя по запросу API. Я могу указать startTime и конец Отметка записей будет загрузить для этого пользователя:

void downloadRecord(String userId, recordStartTime, recordEndTime, int countOfRecord, ResultCallBack) 

Если количество записей между recordStartTime и recordEndTime более чем 100, то ответ API возвращает только 100 записей. И затем мне нужно перебирать этот вызов с новым временем начала (время только что загруженной 100-й записи), пока не будут загружены все записи.

final long recordStartTime = timeStampFrom2DaysAgo//; 
Observable.from(arrayListOfUserIds).flatMap(new Func1<String, Observable<?>>() { 
    @Override 
    public Observable<?> call(String userId) { 
     long recordEndTime = getCurrentTimeMS(); 
     //keep downloading records 
     downloadRecord(userId, recordStartTime, recordEndTime, 100, new ResultCallBack() { 
      //if records are < 100 then stop 
      //else keep downloading 
     }); 
    } 
}).subscribe(); 

Просьба указать, есть ли пример кода RxJava, который я могу использовать для решения моей проблемы.

Благодаря

+0

Я думаю, что это идеальный прецедент для Observable.generate (https://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.generate). К сожалению, я боюсь, что у RxJava нет такого метода :( –

ответ

2

Это может усложниться: посмотреть пример GIST here.

В принципе, вам нужно запросить батуте запросы относительно последующих окон времени. Обратите внимание, что BufferUntilSubscriber является внутренним, но RxJava не имеет официального варианта Subject, который кэширует значения до тех пор, пока не произойдет подписка, а затем сбросит кеш.

Редактировать: обновленный gist, так что он не вызывает MissinBackpressureException.

+0

Спасибо за ваш ответ. Как вы сказали, это действительно сложно. Чтобы понять ваше решение, мне нужно глубоко изучить RxJava. Я еще не был там :) –

4

Это случай циркулярно зависимых наблюдений. Представьте, что у вас есть одно наблюдение за запросами и одно наблюдение за ответами. requests Observable испускает некоторые объекты DownloadParams, содержащие userId, recordStartTime, recordEndTime и countOfRecord, следовательно Observable<DownloadParams> requests. responses Наблюдаемый испускает список записей, то есть Observable<List<Record>> responses.

responses, очевидно, зависит от requests, но не столь очевидная часть является то, что нам нужно requests также зависеть от responses, потому что следующая загрузка с сервера, с определенной DownloadParams, зависит от того, какой ответ мы получили от предыдущей загрузки. Для полноты, requests фактически также зависит от некоторой инициализации. Наблюдаемый, который испускает userId для выполнения первой загрузки. Вы можете заменить эту инициализацию Observable всего на .startWith(firstDownloadParams).

В любом случае, твердая часть выражает циклическую зависимость. Хорошая новость заключается в том, что это возможно в Rx, и было the focus of Cycle.js framework based on RxJS. Плохая новость заключается в том, что мы не можем решить это без Субъектов, что может быть довольно нежелательным.

Лучше сохранить код RxJava как можно более функциональным, но, пытаясь сделать это, мы достигнем проблемы. Если попытаться объявить Наблюдаемые в зависимости от других, мы получаем:

Observable<DownloadParams> requests = responses.flatMap(/* ... */) 
    .startWith(firstDownloadParams); 
Observable<List<Record>> responses = requests.flatMap(/* ... */); 

Это не компилируется, потому что первая декларация нуждается во второй декларации, и наоборот. Вот как могут помочь предметы.Мы заявляем, либо одну из этих наблюдаемых как предмет:

PublishSubject<List<Record>> responsesProxy = PublishSubject.create(); 
Observable<DownloadParams> requests = responsesProxy.flatMap(/* ... */) 
    .startWith(firstDownloadParams); 
Observable<List<Record>> responses = requests.flatMap(/* ... */); 

Субъект responsesProxy будет действовать в качестве прокси-сервера для responses, так что мы можем объявить requests в зависимости от responsesProxy. Но теперь нам нужно responsesProxy, чтобы имитировать responses. Мы делаем это, добавив следующее:

responses.subscribe(responsesProxy); 

Это замыкает контур в круговой зависимости, но только не забудьте правильно распоряжаться и Субъект, так как мы сделали подписку выше.

Теперь нам просто нужно заполнить преобразования в этих flatMap. В requests.flatMap() вы должны выполнить сетевой вызов, который загружает список записей. Вероятно, вы хотите обработать это как наблюдаемое, а не как обратный вызов, чтобы облегчить взаимодействие с остальной частью кода RxJava.

В responsesProxy.flatMap() вы должны проверить, не превышает ли список записей 100 или более, и создать следующий DownloadParams с новой записьюStartTime, завернутый как Observable.just(newDownloadParams). Если значение меньше 100, затем верните Observable.empty().

Смежные вопросы