Я пытаюсь переписать некоторый код с помощью Reactive Extensions для .NET, но мне нужно некоторое руководство по достижению моей цели.Использование Rx для блокировки (и, возможно, таймаута) в асинхронной операции
У меня есть класс, который инкапсулирует некоторое асинхронное поведение в библиотеке низкого уровня. Подумайте, что либо читает, либо пишет сеть. Когда класс запущен, он попытается подключиться к среде, и когда он будет успешным, он будет сигнализировать об этом, вызвав из рабочего потока.
Я хочу, чтобы превратить это асинхронное поведение в синхронном вызов, и я создал очень упрощенный пример ниже о том, что может быть достигнуто:
ManualResetEvent readyEvent = new ManualResetEvent(false);
public void Start(TimeSpan timeout) {
// Simulate a background process
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Wait for startup to complete.
if (!this.readyEvent.WaitOne(timeout))
throw new TimeoutException();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay); // Simulate startup delay.
this.readyEvent.Set();
}
Запуск AsyncStart
на рабочем потоке это просто способ для имитации асинхронное поведение библиотеки и не является частью моего реального кода, где библиотека низкого уровня поставляет поток и вызывает мой код при обратном вызове.
Обратите внимание, что метод Start
будет вызывать TimeoutException
, если запуск не завершен в течение интервала времени ожидания.
Я хочу переписать этот код для использования Rx. Это моя первая попытка:
Subject<Unit> readySubject = new Subject<Unit>();
public void Start(TimeSpan timeout) {
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Point A - see below
this.readySubject.Timeout(timeout).First();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay);
this.readySubject.OnNext(new Unit());
}
Это приличная попытка, но, к сожалению, она содержит состояние гонки. Если запуск завершает быстро (например, если delay
равно 0), и если в точке А есть добавочная задержка, то OnNext
будет вызываться на readySubject
до того, как First
выполнил. В сущности IObservable
Я применяю Timeout
и First
, не увидев, что запуск завершен, и вместо этого будет выбрано TimeoutException
.
Кажется, что Observable.Defer
был создан для решения таких проблем. Вот несколько более сложная попытка использовать Rx:
Subject<Unit> readySubject = new Subject<Unit>();
void Start(TimeSpan timeout) {
var ready = Observable.Defer(() => {
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Point B - see below
return this.readySubject.AsObservable();
});
ready.Timeout(timeout).First();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay);
this.readySubject.OnNext(new Unit());
}
Теперь асинхронная операция не запускается сразу, а только тогда, когда IObservable
используется. К сожалению, все еще есть условие гонки, но на этот раз в точке B. Если асинхронная операция началась, вызовите OnNext
до того, как возвращается лямбда Defer
, она все равно будет потеряна, а TimeoutException
будет выброшен Timeout
.
Я знаю, что я могу использовать операторов, например Replay
, для буферизации событий, но мой первоначальный пример без Rx не использует никакой буферизации. Есть ли способ использовать Rx для решения моей проблемы без условий гонки? По существу, запуск асинхронной операции только после подключения IObservable
ки First
?
На основании ответа Пола Беттс здесь работает решение:
void Start(TimeSpan timeout) {
var readySubject = new AsyncSubject<Unit>();
ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject, TimeSpan.FromSeconds(1)));
// Point C - see below
readySubject.Timeout(timeout).First();
}
void AsyncStart(ISubject<Unit> readySubject, TimeSpan delay) {
Thread.Sleep(delay);
readySubject.OnNext(new Unit());
readySubject.OnCompleted();
}
Самое интересное, когда есть задержка в точке С, что больше, чем время, необходимое для AsyncStart
, чтобы закончить. AsyncSubject
сохраняет последнее отправленное уведомление, а Timeout
и First
будут по-прежнему выполнять, как ожидалось.
Спасибо за ответ. Вопрос был опубликован до выпуска Rx, и с тех пор поверхность API сильно изменилась.Что касается моей конкретной проблемы, я боюсь, что я не могу выбросить блокировку из окна. API, который я использовал, требовал многоступенчатой инициализации, но с помощью Rx разрешил мне распараллелить часть этого. –
Я считаю, что Rx действительно полезен для многоэтапной инициализации, однако я также считаю, что мне все равно не нужно блокировать. Часто у вас может быть одно событие, которое поднимается, когда все части готовы, показывая, что система готова. Затем вы можете эффективно сделать выбор (много), чтобы вызвать то, что вы в противном случае заблокировали бы до вызова. –
О, ps, код, который я поставил, будет работать (в некоторых вариантах) на всех Rx-авитах за последние 3 года. –