2015-11-14 3 views
8

Мне нужно запросить базу данных своевременно, чтобы узнать состояние устаревшей системы. Я думал об упаковке запроса вокруг Observable, но я не знаю, как правильно это сделать.Опрос базы данных с реактивными расширениями

В принципе, это будет тот же запрос каждые 5 секунд. Но я боюсь, что мне придется столкнуться с этими проблемами:

  • Что делать, если выполнение запроса занимает 10 секунд? Я не хочу, чтобы выполнял любой новый запрос, если предыдущий все еще обрабатывается.
  • Также должен быть тайм-аут. Если текущий запрос не выполняет после, например, 20 секунд, информационное сообщение должно быть зарегистрировано в и должна быть отправлена ​​новая попытка (тот же запрос).

Дополнительные детали:

  • Запрос является просто SELECT, который возвращает набор данных со списком кодов состояния (рабочий, нарушенными).
  • Наблюдаемая последовательность всегда будет принимать последние данные, полученные от запроса, что-то вроде метода расширения коммутатора.
  • Я хотел бы обернуть запрос базы данных (операция с длительностью) в задачу, но я не уверен, что это лучший вариант.

Я почти уверен, что запрос должен быть выполнен в другом потоке, но я не знаю, как выглядит наблюдаемое, когда-либо прочитав Introduction to Rx by Lee Campbell.

+0

Вы можете добавить более подробную информацию? Какие данные возвращает запрос? запрос возвращает один объект? в случае таймаута вы говорите, что хотите, чтобы новый запрос запускался, что это за запрос? –

ответ

14

Это довольно классический случай использования Rx для опроса другой системы. Большинство людей будут использовать Observable.Interval в качестве своего оператора, и для большинства это будет хорошо.

Однако у вас есть особые требования к таймаутам и повторным попыткам. В этом случае я думаю, что вы лучше использовать комбинацию операторов:

  • Observable.Timer, чтобы позволить вам выполнить ваш запрос в указанное время
  • Timeout для идентификации и запросов к базе данных, которые перерасход
  • ToObservable() к сопоставьте полученные результаты Task с наблюдаемой последовательностью.
  • Retry, чтобы вы могли восстановить после тайм-аутов
  • Repeat, чтобы вы могли продолжить работу после успешных запросов к базе данных. Это также сохранит этот начальный период/промежуток между завершением предыдущего запроса к базе данных и началом следующего.

Эта рабочая LINQPad сниппет должен показать вам запрос работает правильно:

void Main() 
{ 
    var pollingPeriod = TimeSpan.FromSeconds(5); 
    var dbQueryTimeout = TimeSpan.FromSeconds(10); 

    //You will want to have your Rx query timeout after the expected silence of the timer, and then further maximum silence. 
    var rxQueryTimeOut = pollingPeriod + dbQueryTimeout; 

    var scheduler = new EventLoopScheduler(ts => new Thread(ts) { Name = "DatabasePoller" }); 

    var query = Observable.Timer(pollingPeriod, scheduler) 
        .SelectMany(_ => DatabaseQuery().ToObservable()) 
        .Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler) 
        .Retry() //Loop on errors 
        .Repeat(); //Loop on success 

    query.StartWith("Seed") 
     .TimeInterval(scheduler) //Just to debug, print the timing gaps. 
     .Dump(); 
} 

// Define other methods and classes here 
private static int delay = 9; 
private static int delayModifier = 1; 
public async Task<string> DatabaseQuery() 
{ 
    //Oscillate the delay between 3 and 12 seconds 
    delay += delayModifier; 
    var timespan = TimeSpan.FromSeconds(delay); 
    if (delay < 4 || delay > 11) 
     delayModifier *= -1; 
    timespan.Dump("delay"); 
    await Task.Delay(timespan); 
    return "Value"; 
} 

Результаты выглядят:

Seed 00:00:00.0125407 
Timeout 00:00:15.0166379 
Timeout 00:00:15.0124480 
Timeout 00:00:15.0004520 
Timeout 00:00:15.0013296 
Timeout 00:00:15.0140864 
Value 00:00:14.0251731 
Value 00:00:13.0231958 
Value 00:00:12.0162236 
Value 00:00:11.0138606 

Ключевая часть образца ....

var query = Observable.Timer(TimeSpan.FromSeconds(5), scheduler) 
       .SelectMany(_ => DatabaseQuery().ToObservable()) 
       .Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler) 
       .Retry() //Loop on errors 
       .Repeat(); //Loop on success 

EDIT: Вот еще одно объяснение того, как прийти к этому решению. https://github.com/LeeCampbell/RxCookbook/blob/master/Repository/Polling.md

+0

Является ли основная цель использования EventLoopScheduler здесь, чтобы убедиться, что запрос выполняется в том же потоке? Является ли это лучшим способом для опроса другой системы с использованием RX? – jumpercake

+0

Правильно, это намерение здесь. В этом случае я бы предложил сделать так, чтобы вы не конкурировали с пулом задач/потоков. Как я называю Thread в этом примере, большинство продуктов регистрации также выставляют это тоже. Однако использование EventLoopScheduler здесь не является обязательным, Rx будет поддерживать сериализацию работы. –

+0

Обновлено, чтобы включить ссылку на образец кода с результатами, необходимыми для реализации –

1

Я думаю, что это то, что вы должны сделать:

var query = 
    from n in Observable.Interval(TimeSpan.FromSeconds(5.0)) 
    from ds in Observable.Amb(
     Observable.Start(() => /* Your DataSet query */), 
     Observable 
      .Timer(TimeSpan.FromSeconds(10.0)) 
      .Select(_ => new DataSet("TimeOut"))) 
    select ds; 

Это вызывает новый запрос с интервалом между расстрелы 5 секунд. Это не 5 секунд с момента последнего, это 5 секунд с тех пор, как последний закончился.

Затем вы попробуете ваш запрос, но вы получите .Amb с таймером, который возвращает DataSet через 10 секунд. Если ваш запрос заканчивается до 10 секунд, он выигрывает, но в противном случае возвращается специальный DataSet. Оператор .Amb в основном является оператором «гонки» - первым наблюдаемым для получения выигрыша.

+0

Ничего себе, комбинация наблюдаемых впечатляет! Вы имеете в виду, что оба наблюдаемых будут гоняться, и Amb получит первое место. То, что меня теряет, - это до 2 вложенных из класов. Является ли эта часть той, которая разрешает ту часть, в которой вы говорите: «Это вызывает новый запрос с интервалом между исполнением 5 секунд. Это не 5 секунд с момента последнего запуска, а 5 секунд с момента окончания последнего».? – SuperJMN

+1

@SuperJMN - Спасибо. 'Observable.Interval (TimeSpan.FromSeconds (5.0))' срабатывает только через 5 секунд после того, как все подписчики завершили свою работу. Поэтому, если вторая часть запроса работает, интервал не срабатывает до 5 секунд после его завершения. – Enigmativity

+0

I _think_, поскольку вы используете 'SelectMany', чтобы потреблять свой' Observable.Interval', который не является ничем _блоком_ потребления, поэтому он будет указывать каждые 5 секунд, то есть не один раз, когда запрос БД завершен. –

Смежные вопросы