2016-12-21 3 views
2

Я хочу обрабатывать коллекцию заданий io-bound одновременно, но привязывать/ограничивать количество выдающихся (активно работающих) параллельных заданий.Связанные задания/дросселирование одновременных заданий, не создавая нить за задание

Chunking - это простой способ увеличить параллелизм, но создает узкие места, если предметы занимают различное количество времени.

Способ, которым я нашел для этого, имеет некоторые проблемы 1). Есть ли способ сделать это, избегая проблем ниже, оставаясь сравнимо идиоматичным и кратким?

1) используйте BlockingCollection (показано ниже). Однако это приводит к решению, в котором параллелизм здесь генерируется boundedSize количеством «потребительских» потоков. Я ищу решение, которое не требует boundedSize количества потоков для достижения boundedSize одновременных заданий. (что если boundedSize очень большой?). Я не видел, как я мог взять предмет, обработать его, а затем завершение сигнала. Я могу принимать только предметы ... и так как я не хочу сразу перебирать весь список, потребитель должен запускать его работу синхронно.

type JobNum = int 

let RunConcurrentlyBounded (boundedSize:int) (start : JobNum) (finish : JobNum) (mkJob: JobNum -> Async<Unit>) = 

    // create a BlockingCollection 
    use bc = new BlockingCollection<Async<Unit>>(boundedSize) 

    // put async jobs on BlockingCollection 
    Async.Start(async { 
     { start .. finish } 
     |> Seq.map mkJob 
     |> Seq.iter bc.Add 
     bc.CompleteAdding() 
    }) 

    // each consumer runs it's job synchronously 
    let mkConsumer (consumerId:int) = async { for job in bc.GetConsumingEnumerable() do do! job } 

    // create `boundedSize` number of consumers in parallel 
    { 1 .. boundedSize } 
    |> Seq.map mkConsumer 
    |> Async.Parallel 
    |> Async.RunSynchronously 
    |> ignore 

let Test() = 
    let boundedSize = 15 
    let start = 1 
    let finish = 50 
    let mkJob = (fun jobNum -> async { 
     printfn "%A STARTED" jobNum 
     do! Async.Sleep(5000) 
     printfn "%A COMPLETED" jobNum 
    }) 
    RunConcurrentlyBounded boundedSize start finish mkJob 

Я знаю TPL и почтовые процессоры, но подумал, что, возможно, уже было что-то простое & прочного, но позволяет избежать большого числа маршрута создания нити.

В идеале будет только одна нить производителя и одна потребительская нить; Я подозреваю, что BlockingCollection не может быть правильным примитивом параллелизма для такого случая?

+0

Почему бы не TPL? Это довольно просто использовать. – Asti

ответ

0

Это похоже на то, что я собираюсь получить, используя SemaphoreSlim.

Я полагаю, что основной ThreadPool действительно контролирует параллелизм здесь.

let RunConcurrentlySemaphore (boundedSize:int) (start : JobNum) (finish : JobNum) (mkJob: JobNum -> Async<Unit>) = 

    use ss = new SemaphoreSlim(boundedSize); 

    { start .. finish } 
     |> Seq.map (mkJob >> fun job -> async { 
      do! Async.AwaitTask(ss.WaitAsync()) 
      try do! job finally ss.Release() |> ignore 
     }) 
     |> Async.Parallel 
     |> Async.RunSynchronously 
Смежные вопросы