2012-03-28 5 views
1

Я извлекаю данные из API Bloomberg и удивляюсь медленности. Мое вычисление ограничено этим.преобразование блокировки потока для блокировки без потока в f #

Поэтому я решил использовать какой-то асинхронный построитель монады, чтобы развязать его. После запуска результаты не намного лучше, что было очевидно, когда я делаю вызов функции NextEvent, которая является блокировкой потоков.

 let outerloop args dic = 
     ... 
     let rec innerloop continuetoloop = 
      let eventObj = session.NextEvent(); //This blocks 
      ... 

    let seqtable = reader.ReadFile(@"C:\homeware\sector.csv", ";".[0], true) 

    let dic = ConcurrentDictionary<_,_>() 
    let wf = seqtable |> Seq.mapi (fun i item -> async { outerloop item dic }) 
    wf |> Async.Parallel 
     |> Async.RunSynchronously 
     |> ignore 
    printfn "%A" ret 

Есть хороший способ, чтобы обернуть, что блокирующий вызов к блокирующим вызову? Кроме того, почему структура async не создает столько потоков, сколько у меня есть запросы (например, 200)? когда я инспектировать нити, из которых я получаю значение я вижу только 4-5, которые используются ..

UPDATE

Я нашел убедительную причину, почему он никогда не будет возможно. Операция async берет то, что после инструкции async, и планируйте ее где-то в threadpool. для всего, что имеет значение, до тех пор, пока функция асинхронизации используется правильно, то есть, всегда возвращаясь к файлу threadpool, от которого он произошел, мы можем считать, что мы выполняемся в одном потоке.

Будучи на одном потоке, все планирование должно выполняться где-то позже, а команда блокировки не может избежать того факта, что, в конце концов, после ее запуска ему придется блокировать в какой-то момент в будущем поток.

+0

Как бы создать 200 потоков вам ни при каких обстоятельствах? Если у вас было 200 соединений, у вас будет 200 очень медленных соединений, а не только несколько быстрых. (Не упоминание 200 МБ накладных расходов.) – svick

+0

Кроме того, если ваша операция блокируется IO, то использование большего количества ЦП не принесет вам большой пользы. Он медленный, потому что сеть работает медленно, и вы не будете делать свою сеть быстрее, используя больше потоков. – svick

+0

@svick Почему бы не 200 медленных соединений быть в 40 раз лучше, чем 5 медленных соединений для IO связанных вычислений? Я вижу, что сейчас он обрабатывает 40 пучков из 5 операций, что будет лучше обслуживать 1 пучок из 200 операций. Или я чего-то не хватает? – nicolas

ответ

2

Есть ли хороший способ обернуть этот блокирующий вызов неблокируемому звонку?

Нет. Вы никогда не сможете переносить блокирующие вызовы, чтобы сделать их неблокирующими. Если бы вы могли, асинк был бы просто шаблоном проектирования, а не фундаментальным сдвигом парадигмы.

Кроме того, почему инфраструктура async не создает столько потоков, сколько у меня есть запросы (например, 200)?

Async построен на пуле потоков, который разработан, чтобы не создавать потоки агрессивно, потому что они настолько дороги. Весь смысл (реального) async состоит в том, что он не требует столько потоков, сколько есть соединений. Вы можете обрабатывать 10 000 одновременных соединений с ~ 30 потоками.

Возможно, у вас есть полное непонимание того, что такое асинхронное и для чего оно предназначено. Я бы предложил купить любую из книг F #, которые охватывают эту тему и читают ее. В частности, ваше решение не является асинхронным, потому что вы просто вызываете свой блокирующий элемент StartGetFieldsValue из вашего рабочего процесса async, победив цель async. Вместо этого вы можете просто сделать Array.Parallel.map getFieldsValue.

Кроме того, вы хотите использовать чисто функциональный API, когда делаете что-то параллельно, а не мутируете ConcurrentDictionary на месте. Так заменить req.StartGetFieldsValue ret с

let! result = req.StartGetFieldsValue() 
... 
return result 

и заменить ignore с dict.

+0

Я не надеялся на решение, использующее async для первого pb. как я читал, asyncs - это просто перезапись CPS. (для словаря можно, если не эффективно для этого конкретного API, делать запрос на несколько ценных бумаг, по нескольким полям, а информация идет по битам по битам. Какая самая маленькая единица имеет смысл ... там вероятно, хороший способ его упаковки, но я не знаю об этом) – nicolas

+0

, что, будучи сказанным, я обязательно прочитаю еще несколько материалов, потому что должен быть способ правильно отобразить этот api .. – nicolas

+0

Ваше замечание заставило меня снова подумать на каких действительно асинхронных вычислениях. и хотя мы можем сказать, что они просто переписывают CPS, это просто форма кода кода. реальная история, если я не ошибаюсь в этом, заключается в том, что асинхронные вычисления для * threadpools *, какие вычисления синхронизации предназначены для * потоков *. поэтому они «чувствуют» себя как синхронные вычисления большую часть времени. threadpool, как вы указали, чрезвычайно полезен для того, что мы делаем в 99% случаев, асинхронные конструкции отражают эту силу. – nicolas

0

Вот решение, которое я сделал, что, похоже, работает. Конечно, он не использует только async (минус a), но также Async.

Я определяю простой тип, который имеет одно событие, законченное и один метод asyncstart, с помощью метода для запуска в качестве аргумента. он запускает метод, затем срабатывает событие закончилось в соответствующем месте (в моем случае я должен был захватить контекст синхронизации и т.д ..)

Тогда в стороне потребителя, я использую просто

let! completion = Async.Waitfromevent wapper.finished |> Async.StartAsChild 
let! completed = completion 

В то время как запуск этого кода, со стороны потребителя, я использую только асинхронные вызовы, что делает мой код незаблокированным. Конечно, где-то есть какой-то поток, который заблокирован, но это происходит за пределами моей основной рабочей петли, которая остается реактивной и подходящей.

Смежные вопросы