2011-01-17 3 views
9

Я пытаюсь переписать некоторый код с помощью Reactive Extensions для .NET, но мне нужно некоторое руководство по достижению моей цели.Использование Rx для блокировки (и, возможно, таймаута) в асинхронной операции

У меня есть класс, который инкапсулирует некоторое асинхронное поведение в библиотеке низкого уровня. Подумайте, что либо читает, либо пишет сеть. Когда класс запущен, он попытается подключиться к среде, и когда он будет успешным, он будет сигнализировать об этом, вызвав из рабочего потока.

Я хочу, чтобы превратить это асинхронное поведение в синхронном вызов, и я создал очень упрощенный пример ниже о том, что может быть достигнуто:

ManualResetEvent readyEvent = new ManualResetEvent(false); 

public void Start(TimeSpan timeout) { 
    // Simulate a background process 
    ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1))); 
    // Wait for startup to complete. 
    if (!this.readyEvent.WaitOne(timeout)) 
    throw new TimeoutException(); 
} 

void AsyncStart(TimeSpan delay) { 
    Thread.Sleep(delay); // Simulate startup delay. 
    this.readyEvent.Set(); 
} 

Запуск AsyncStart на рабочем потоке это просто способ для имитации асинхронное поведение библиотеки и не является частью моего реального кода, где библиотека низкого уровня поставляет поток и вызывает мой код при обратном вызове.

Обратите внимание, что метод Start будет вызывать TimeoutException, если запуск не завершен в течение интервала времени ожидания.

Я хочу переписать этот код для использования Rx. Это моя первая попытка:

Subject<Unit> readySubject = new Subject<Unit>(); 

public void Start(TimeSpan timeout) { 
    ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1))); 
    // Point A - see below 
    this.readySubject.Timeout(timeout).First(); 
} 

void AsyncStart(TimeSpan delay) { 
    Thread.Sleep(delay); 
    this.readySubject.OnNext(new Unit()); 
} 

Это приличная попытка, но, к сожалению, она содержит состояние гонки. Если запуск завершает быстро (например, если delay равно 0), и если в точке А есть добавочная задержка, то OnNext будет вызываться на readySubject до того, как First выполнил. В сущности IObservable Я применяю Timeout и First, не увидев, что запуск завершен, и вместо этого будет выбрано TimeoutException.

Кажется, что Observable.Defer был создан для решения таких проблем. Вот несколько более сложная попытка использовать Rx:

Subject<Unit> readySubject = new Subject<Unit>(); 

void Start(TimeSpan timeout) { 
    var ready = Observable.Defer(() => { 
    ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1))); 
    // Point B - see below 
    return this.readySubject.AsObservable(); 
    }); 
    ready.Timeout(timeout).First(); 
} 

void AsyncStart(TimeSpan delay) { 
    Thread.Sleep(delay); 
    this.readySubject.OnNext(new Unit()); 
} 

Теперь асинхронная операция не запускается сразу, а только тогда, когда IObservable используется. К сожалению, все еще есть условие гонки, но на этот раз в точке B. Если асинхронная операция началась, вызовите OnNext до того, как возвращается лямбда Defer, она все равно будет потеряна, а TimeoutException будет выброшен Timeout.

Я знаю, что я могу использовать операторов, например Replay, для буферизации событий, но мой первоначальный пример без Rx не использует никакой буферизации. Есть ли способ использовать Rx для решения моей проблемы без условий гонки? По существу, запуск асинхронной операции только после подключения IObservable ки First?


На основании ответа Пола Беттс здесь работает решение:

void Start(TimeSpan timeout) { 
    var readySubject = new AsyncSubject<Unit>(); 
    ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject, TimeSpan.FromSeconds(1))); 
    // Point C - see below 
    readySubject.Timeout(timeout).First(); 
} 

void AsyncStart(ISubject<Unit> readySubject, TimeSpan delay) { 
    Thread.Sleep(delay); 
    readySubject.OnNext(new Unit()); 
    readySubject.OnCompleted(); 
} 

Самое интересное, когда есть задержка в точке С, что больше, чем время, необходимое для AsyncStart, чтобы закончить. AsyncSubject сохраняет последнее отправленное уведомление, а Timeout и First будут по-прежнему выполнять, как ожидалось.

ответ

12

Итак, одно, что нужно знать о Rx Я думаю, что многие люди сначала (включая меня!): Если вы используете какую-либо традиционную функцию потоковой передачи, такую ​​как ResetEvents, Thread.Sleeps или что-то еще, вы делаете It Wrong (tm) - это похоже на то, чтобы отличать вещи от массивов в LINQ, потому что вы знаете, что базовый тип оказывается массивом.

Главное, чтобы знать, что функция async представлена ​​функцией, которая возвращает IObservable<TResult> - это волшебный соус, который позволяет вам сигнализировать о завершении чего-либо. Так вот, как вы «Rx-римента» более традиционной асинхронной FUNC, как вы бы видеть в Silverlight веб-службы:

IObservable<byte[]> readFromNetwork() 
{ 
    var ret = new AsyncSubject(); 
    // Here's a traditional async function that you provide a callback to 
    asyncReaderFunc(theFile, buffer => { 
     ret.OnNext(buffer); 
     ret.OnCompleted(); 
    }); 

    return ret; 
} 

Это приличная попытка, но, к сожалению, содержит условие гонки.

Это где AsyncSubject приходит - это гарантирует, что даже если asyncReaderFunc бьет Подписаться на удар, AsyncSubject все равно будет «переигрывать», что случилось.

Итак, теперь, когда мы получили нашу функцию, мы можем сделать много интересных вещей к нему:

// Make it into a sync function 
byte[] results = readFromNetwork().First(); 

// Keep reading blocks one at a time until we run out 
readFromNetwork().Repeat().TakeUntil(x => x == null || x.Length == 0).Subscribe(bytes => { 
    Console.WriteLine("Read {0} bytes in chunk", bytes.Length); 
}) 

// Read the entire stream and get notified when the whole deal is finished 
readFromNetwork() 
    .Repeat().TakeUntil(x => x == null || x.Length == 0) 
    .Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes)) 
    .Subscribe(ms => { 
     Console.WriteLine("Got {0} bytes in total", ms.ToArray().Length); 
    }); 

// Or just get the entire thing as a MemoryStream and wait for it 
var memoryStream = readFromNetwork() 
    .Repeat().TakeUntil(x => x == null || x.Length == 0) 
    .Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes)) 
    .First(); 
4

Я хотел бы далее добавить к комментарию Павла добавляющих WaitHandles означает, что вы делаете это неправильно, что использование Субъектов напрямую означает, что вы тоже ошибаетесь. ;-)

Постарайтесь, чтобы ваш Rx-код работал с последовательностями или конвейерами. Субъекты предлагают возможности чтения и записи, что означает, что вы больше не работаете с конвейером или последовательностью (если у вас нет pipleines, которые идут в обоих направлениях или последовательностях, которые могут быть отменены?)?

Итак, первый код Павла довольно крут, но давайте «Rx черт возьми».

первый AsyncStart изменить способ его к этому

IObservable<Unit> AsyncStart(TimeSpan delay) 
{ 
    Observable.Timer(delay).Select(_=>Unit.Default); 
} 

Так легко! Не смотрите на предметы, и данные поступают только в одну сторону. Важным здесь является изменение подписи. Это подтолкнет нас. Это сейчас очень явное. Прохождение в Субъекте мне очень неоднозначно.

2-й. Нам теперь не нужен предмет, определенный в методе start. Мы также можем использовать функции Scheduler вместо старого ThreadPool.QueueUserWorkItem.

void Start(TimeSpan timeout) 
{ 
    var isReady = AsyncStart(TimeSpan.FromSeconds(1)) 
        .SubscribeOn(Scheduler.ThreadPool) 
        .PublishLast(); 
    isReady.Connect(); 
    isReady.Timeout(timeout).First(); 
} 

Теперь у нас есть четкий трубопровод или последовательность событий

AsyncStart -> isReady -> Начать

Вместо Пуск -> AsyncStart -> Начать

If Я знал больше о вашем проблемном пространстве, я уверен, что мы могли бы придумать еще лучший способ сделать это, что не требовало блокирующего характера метода запуска. Чем больше вы используете Rx, тем больше вы обнаружите, что ваши старые предположения, когда вам нужно блокировать, использовать waithandles и т. Д., Можно выбросить из окна.

+0

Спасибо за ответ. Вопрос был опубликован до выпуска Rx, и с тех пор поверхность API сильно изменилась.Что касается моей конкретной проблемы, я боюсь, что я не могу выбросить блокировку из окна. API, который я использовал, требовал многоступенчатой ​​инициализации, но с помощью Rx разрешил мне распараллелить часть этого. –

+0

Я считаю, что Rx действительно полезен для многоэтапной инициализации, однако я также считаю, что мне все равно не нужно блокировать. Часто у вас может быть одно событие, которое поднимается, когда все части готовы, показывая, что система готова. Затем вы можете эффективно сделать выбор (много), чтобы вызвать то, что вы в противном случае заблокировали бы до вызова. –

+0

О, ps, код, который я поставил, будет работать (в некоторых вариантах) на всех Rx-авитах за последние 3 года. –

Смежные вопросы