2015-11-03 2 views
0

У меня есть интересный вопрос для экспертов Rx. У меня есть реляционная таблица, содержащая информацию о событиях. Событие состоит из идентификатора, типа и времени. В моем коде мне нужно получить все события в определенном, потенциально широком, временном диапазоне.Дозирование больших результирующих наборов с использованием Rx

SELECT * FROM events WHERE event.time > :before AND event.time < :after ORDER  BY time LIMIT :batch_size 

Чтобы повысить надежность и иметь дело с большими наборами результатов, я запрашиваю записи в партиях размером: batch_size. Теперь я хочу написать функцию, которая, если: before и: after, вернет Observable, представляющий набор результатов.

Observable<Event> getEvents(long before, long after); 

Внутренне функция должна запрашивать базу данных партиями. Распределение событий по шкале времени неизвестно. Таким образом, естественным способом обработки пакетных операций является следующее: извлекает первые N записей , если результат не пуст, используйте последнее время записи как новый параметр «before» и выберите следующие N записей; в противном случае завершите , если результат не пуст, используйте последнее время записи как новый параметр «before» и выберите следующие N записей; в противном случае прекратить ... и так далее (идея должна быть ясна)

Мой вопрос:

Есть ли способ, чтобы выразить эту функцию в терминах более высокого уровня наблюдаемыми примитивов (фильтр/карте/flatMap/scan/range и т. д.), не используя подписчиков явно?

До сих пор, я не в состоянии сделать это, и придумать следующий простой код вместо:

private void observeGetRecords(long before, long after, Subscriber<? super Event> subscriber) { 
    long start = before; 
    while (start < after) { 
     final List<Event> records; 
     try { 
      records = getRecordsByRange(start, after); 
     } catch (Exception e) { 
      subscriber.onError(e); 
      return; 
     } 
     if (records.isEmpty()) break; 
     records.forEach(subscriber::onNext); 
     start = Iterables.getLast(records).getTime(); 
    } 

    subscriber.onCompleted(); 
} 

public Observable<Event> getRecords(final long before, final long after) { 
     return Observable.create(subscriber -> observeGetRecords(before, after, subscriber)); 
} 

Здесь getRecordsByRange реализует запрос на выборку с помощью DBI и возвращает список. Этот код работает отлично, но не обладает элегантностью высокоуровневых конструкций Rx.

NB: Я знаю, что я могу вернуть Iterator в результате запроса SELECT в DBI. Однако я не хочу этого делать и предпочитаю запускать несколько запросов. Это вычисление не должно быть атомарным, поэтому проблемы изоляции транзакций не актуальны.

+0

Я не совсем понимаю, почему вы хотите, чтобы продолжить с момента последней партии. если tstart, если данные в этом диапазоне заканчиваются на tdata, почему вы хотите запросить из tdata? На данный момент нет данных, доступных между tdata и тенденциями. – akarnokd

+0

Вот почему я проверяю, пуст ли «список записей», и прерывается ли из цикла, если это так. – GrumpyCat

ответ

0

Хотя я не совсем понимаю, почему вы хотите такой временной повторное использование, вот как я бы это сделать:

BehaviorSubject<Long> start = BehaviorSubject.create(0L); 

start 
.subscribeOn(Schedulers.trampoline()) 
.flatMap(tstart -> 
    getEvents(tstart, tstart + twindow) 
    .publish(o -> 
     o.takeLast(1) 
     .doOnNext(r -> start.onNext(r.time)) 
     .ignoreElements() 
     .mergeWith(o) 
    ) 
) 
.subscribe(...) 
+0

Спасибо, я обязательно должен изучить темы, которые раньше их не использовали. – GrumpyCat

+0

Кстати, что вы подразумеваете под временным использованием? Я просто хочу, чтобы пакетные запросы SQL были непрозрачными для клиента функции getRecords. – GrumpyCat

+0

Я имел в виду, что вы используете вывод запроса для параметризации следующего запроса. – akarnokd