Я хочу обрабатывать коллекцию заданий io-bound одновременно, но привязывать/ограничивать количество выдающихся (активно работающих) параллельных заданий.Связанные задания/дросселирование одновременных заданий, не создавая нить за задание
Chunking - это простой способ увеличить параллелизм, но создает узкие места, если предметы занимают различное количество времени.
Способ, которым я нашел для этого, имеет некоторые проблемы 1)
. Есть ли способ сделать это, избегая проблем ниже, оставаясь сравнимо идиоматичным и кратким?
1)
используйте BlockingCollection (показано ниже). Однако это приводит к решению, в котором параллелизм здесь генерируется boundedSize
количеством «потребительских» потоков. Я ищу решение, которое не требует boundedSize
количества потоков для достижения boundedSize
одновременных заданий. (что если boundedSize
очень большой?). Я не видел, как я мог взять предмет, обработать его, а затем завершение сигнала. Я могу принимать только предметы ... и так как я не хочу сразу перебирать весь список, потребитель должен запускать его работу синхронно.
type JobNum = int
let RunConcurrentlyBounded (boundedSize:int) (start : JobNum) (finish : JobNum) (mkJob: JobNum -> Async<Unit>) =
// create a BlockingCollection
use bc = new BlockingCollection<Async<Unit>>(boundedSize)
// put async jobs on BlockingCollection
Async.Start(async {
{ start .. finish }
|> Seq.map mkJob
|> Seq.iter bc.Add
bc.CompleteAdding()
})
// each consumer runs it's job synchronously
let mkConsumer (consumerId:int) = async { for job in bc.GetConsumingEnumerable() do do! job }
// create `boundedSize` number of consumers in parallel
{ 1 .. boundedSize }
|> Seq.map mkConsumer
|> Async.Parallel
|> Async.RunSynchronously
|> ignore
let Test() =
let boundedSize = 15
let start = 1
let finish = 50
let mkJob = (fun jobNum -> async {
printfn "%A STARTED" jobNum
do! Async.Sleep(5000)
printfn "%A COMPLETED" jobNum
})
RunConcurrentlyBounded boundedSize start finish mkJob
Я знаю TPL и почтовые процессоры, но подумал, что, возможно, уже было что-то простое & прочного, но позволяет избежать большого числа маршрута создания нити.
В идеале будет только одна нить производителя и одна потребительская нить; Я подозреваю, что BlockingCollection не может быть правильным примитивом параллелизма для такого случая?
Почему бы не TPL? Это довольно просто использовать. – Asti