2015-08-25 2 views
3

Я ищу что-то вроде Semaphore, но это разрешает после того, как все слоты выпущены.Асинхронно ждать появления n вещей

Что-то вроде этого:

use semaphore = new SemaphoreSlim(0,100) 

anEvent.add(fun _ -> semaphore.Release(1) |> ignore); 

async { 
    do! thingThatCausesAnEventToFire100Times() 
    //where 100 is the available slots instead of the timeout. 
    let! thingsHappened = semaphore.WaitAsync(100) |> Async.AwaitTask 
    thingsHappened |> should be True 
} 
+0

О да, если вы знаете ответ в C#, это тоже хорошо: P – albertjan

+0

семафоры и прочее могут возникнуть проблемы с асинхронным (разные потоки), чтобы в первую очередь я просто пошел простой замок/ref counter/manualresetevent – Carsten

+0

@ Carsten Семафоры предназначены для асинхронного/многопоточного использования. Из msdn для 'SemaphoreSlim':« Представляет облегченную альтернативу Семафору, которая ограничивает количество __threads__, которые могут одновременно обращаться к ресурсу или пулу ресурсов ». – albertjan

ответ

3

Похоже, работа для MailboxProcessor. Как об этом:

type SemaphoreCommand = 
    |Release 
    |Wait of AsyncReplyChannel<unit> 

let semaphore slots = 
    Agent.Start 
    <| fun inbox -> 
     let rec loop c (w:AsyncReplyChannel<unit> list) = 
      async { 
      let! command = inbox.Receive() 
      match command with 
      |Release -> if (c + 1) = slots then w |> List.iter(fun t -> t.Reply()) 
         else return! loop (c + 1) w 
      |Wait a -> return! loop c (a::w) 
     } 
     loop 0 [] 

let slotWaiter = semaphore 100 

//Events will fill up slots 
Release |> slotWaiter.Post 
Release |> slotWaiter.Post 

async{ 
    //Wait for all slots to be filled 
    do! slotWaiter.PostAndAsyncReply(fun t -> Wait t) 
    //All slots filled - continue 
} 

Я не обрабатывает случай, когда вы можете не зарегистрировал AsyncReplyChannel к тому времени заполнены все слоты или сброса, когда все слоты заполнены, но это довольно тривиально, и я оставьте это как упражнение для читателя :)

+0

Спасибо, я посмотрю, насколько тривиальным это оказывается :). Я бы не подумал о «MailBoxProcessor» для этого, но я выгляжу хорошо. – albertjan

+1

'w |> List.iter (fun t -> t.Reply())' is no-op, когда есть 'w = []', поэтому нет причин для объяснения того, что отсутствующее сокращение AsyncReplyChannel легко достигается потому, что 'MailboxProcessor'' IDisposable', поэтому вы можете использовать 'use slotAwaiter = semphore 100' – albertjan

Смежные вопросы