Это случай циркулярно зависимых наблюдений. Представьте, что у вас есть одно наблюдение за запросами и одно наблюдение за ответами. requests
Observable испускает некоторые объекты DownloadParams
, содержащие userId, recordStartTime, recordEndTime и countOfRecord, следовательно Observable<DownloadParams> requests
. responses
Наблюдаемый испускает список записей, то есть Observable<List<Record>> responses
.
responses
, очевидно, зависит от requests
, но не столь очевидная часть является то, что нам нужно requests
также зависеть от responses
, потому что следующая загрузка с сервера, с определенной DownloadParams
, зависит от того, какой ответ мы получили от предыдущей загрузки. Для полноты, requests
фактически также зависит от некоторой инициализации. Наблюдаемый, который испускает userId для выполнения первой загрузки. Вы можете заменить эту инициализацию Observable всего на .startWith(firstDownloadParams)
.
В любом случае, твердая часть выражает циклическую зависимость. Хорошая новость заключается в том, что это возможно в Rx, и было the focus of Cycle.js framework based on RxJS. Плохая новость заключается в том, что мы не можем решить это без Субъектов, что может быть довольно нежелательным.
Лучше сохранить код RxJava как можно более функциональным, но, пытаясь сделать это, мы достигнем проблемы. Если попытаться объявить Наблюдаемые в зависимости от других, мы получаем:
Observable<DownloadParams> requests = responses.flatMap(/* ... */)
.startWith(firstDownloadParams);
Observable<List<Record>> responses = requests.flatMap(/* ... */);
Это не компилируется, потому что первая декларация нуждается во второй декларации, и наоборот. Вот как могут помочь предметы.Мы заявляем, либо одну из этих наблюдаемых как предмет:
PublishSubject<List<Record>> responsesProxy = PublishSubject.create();
Observable<DownloadParams> requests = responsesProxy.flatMap(/* ... */)
.startWith(firstDownloadParams);
Observable<List<Record>> responses = requests.flatMap(/* ... */);
Субъект responsesProxy
будет действовать в качестве прокси-сервера для responses
, так что мы можем объявить requests
в зависимости от responsesProxy
. Но теперь нам нужно responsesProxy
, чтобы имитировать responses
. Мы делаем это, добавив следующее:
responses.subscribe(responsesProxy);
Это замыкает контур в круговой зависимости, но только не забудьте правильно распоряжаться и Субъект, так как мы сделали подписку выше.
Теперь нам просто нужно заполнить преобразования в этих flatMap
. В requests.flatMap()
вы должны выполнить сетевой вызов, который загружает список записей. Вероятно, вы хотите обработать это как наблюдаемое, а не как обратный вызов, чтобы облегчить взаимодействие с остальной частью кода RxJava.
В responsesProxy.flatMap()
вы должны проверить, не превышает ли список записей 100 или более, и создать следующий DownloadParams
с новой записьюStartTime, завернутый как Observable.just(newDownloadParams)
. Если значение меньше 100, затем верните Observable.empty()
.
Я думаю, что это идеальный прецедент для Observable.generate (https://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.generate). К сожалению, я боюсь, что у RxJava нет такого метода :( –