У меня есть интересный вопрос для экспертов Rx. У меня есть реляционная таблица, содержащая информацию о событиях. Событие состоит из идентификатора, типа и времени. В моем коде мне нужно получить все события в определенном, потенциально широком, временном диапазоне.Дозирование больших результирующих наборов с использованием Rx
SELECT * FROM events WHERE event.time > :before AND event.time < :after ORDER BY time LIMIT :batch_size
Чтобы повысить надежность и иметь дело с большими наборами результатов, я запрашиваю записи в партиях размером: batch_size. Теперь я хочу написать функцию, которая, если: before и: after, вернет Observable, представляющий набор результатов.
Observable<Event> getEvents(long before, long after);
Внутренне функция должна запрашивать базу данных партиями. Распределение событий по шкале времени неизвестно. Таким образом, естественным способом обработки пакетных операций является следующее: извлекает первые N записей , если результат не пуст, используйте последнее время записи как новый параметр «before» и выберите следующие N записей; в противном случае завершите , если результат не пуст, используйте последнее время записи как новый параметр «before» и выберите следующие N записей; в противном случае прекратить ... и так далее (идея должна быть ясна)
Мой вопрос:
Есть ли способ, чтобы выразить эту функцию в терминах более высокого уровня наблюдаемыми примитивов (фильтр/карте/flatMap/scan/range и т. д.), не используя подписчиков явно?
До сих пор, я не в состоянии сделать это, и придумать следующий простой код вместо:
private void observeGetRecords(long before, long after, Subscriber<? super Event> subscriber) {
long start = before;
while (start < after) {
final List<Event> records;
try {
records = getRecordsByRange(start, after);
} catch (Exception e) {
subscriber.onError(e);
return;
}
if (records.isEmpty()) break;
records.forEach(subscriber::onNext);
start = Iterables.getLast(records).getTime();
}
subscriber.onCompleted();
}
public Observable<Event> getRecords(final long before, final long after) {
return Observable.create(subscriber -> observeGetRecords(before, after, subscriber));
}
Здесь getRecordsByRange реализует запрос на выборку с помощью DBI и возвращает список. Этот код работает отлично, но не обладает элегантностью высокоуровневых конструкций Rx.
NB: Я знаю, что я могу вернуть Iterator в результате запроса SELECT в DBI. Однако я не хочу этого делать и предпочитаю запускать несколько запросов. Это вычисление не должно быть атомарным, поэтому проблемы изоляции транзакций не актуальны.
Я не совсем понимаю, почему вы хотите, чтобы продолжить с момента последней партии. если tstart, если данные в этом диапазоне заканчиваются на tdata, почему вы хотите запросить из tdata? На данный момент нет данных, доступных между tdata и тенденциями. – akarnokd
Вот почему я проверяю, пуст ли «список записей», и прерывается ли из цикла, если это так. – GrumpyCat