2015-03-19 2 views
2

Я хотел бы использовать Control.Concurrent.Async mapConcurrently для параллельной загрузки с помощью http-conduit. Решение here не является достаточным для моего случая, потому что я хотел бы обработать п задачи, но душить количество одновременных рабочих м, где м < н.Запуск параллельных загрузок URL-адресов с рабочим пулом в Haskell

Это не достаточно, либо перейти к mapConcurrently нескольких кусков м, потому что тогда число активных работников будет иметь тенденцию быть менее м, так как некоторые из задач, будет завершена раньше, чем другие, оставляя зазор использования ,

Есть ли простой способ - почти так же просто, как использовать mapConcurrently Надеюсь - реализовать рабочий пул, одновременно выполняющий очередь задач до тех пор, пока все задачи не будут выполнены?

Или проще просто сохранить Haskell простым и сделать параллелизм уровня процесса с помощью xargs -P?

+0

Несколько связанных SO вопрос: http://stackoverflow.com/questions/15191649/are-thread-pools-needed -for-pure-haskell-code – danidiaz

ответ

3

Возможно, самым простым решением является удушения IO действия с использованием semaphore перед оборачивать их в Concurrently, используя вспомогательную функцию, как это:

withConc :: QSem -> (a -> IO b) -> (a -> Concurrently b) 
withConc sem f = \a -> Concurrently 
    (bracket_ (waitQSem sem) (signalQSem sem) (f a)) 

Мы можем использовать withConc в сочетании с traverse, чтобы выполнить дросселированном одновременно обход любого Traversable контейнера задач:

traverseThrottled :: Int -> (a -> IO b) -> [a] -> IO [b] 
traverseThrottled concLevel action tasks = do 
    sem <- newQSem concLevel 
    runConcurrently (traverse (withConc sem action) tasks) 

Одним из недостатков этого подхода является то, что т он использует Concurrently, будет создавать столько потоков, сколько есть задач, и только подмножество из них будет выполнять фактическую работу в любой момент времени, благодаря семафору.

С другой стороны, потоки в Haskell дешевы, поэтому я думаю, что это приемлемое решение в случаях, когда количество задач не очень велико.

Edit: Предоставление traverseThrottled более общую подпись:

import Data.Traversable 
import Control.Concurrent 
import Control.Concurrent.Async 
import Control.Exception 

traverseThrottled :: Traversable t => Int -> (a -> IO b) -> t a -> IO (t b) 
traverseThrottled concLevel action taskContainer = do 
    sem <- newQSem concLevel 
    let throttledAction = bracket_ (waitQSem sem) (signalQSem sem) . action 
    runConcurrently (traverse (Concurrently . throttledAction) taskContainer) 
+0

Спасибо, этот ответ выглядит великолепно. Потоки, конечно, будут дешевле, чем разворачивание нескольких процессов Haskell с помощью 'xargs -P' – dan

2

Я бы предложил использовать parallel или parallelInterleaved из parallel-io. Он обладает (помимо прочего) этими свойствами;

  1. Никогда не создает более или менее разблокировал потоков, чем указанные жить в бассейне. NB: этот счетчик включает поток, выполняющий параллель. Это должно свести к минимуму конфликт и, следовательно, упреждение, а также предотвращение голодания.
  2. С возвращением все действия выполнены.
  3. Функция возвращается своевременно, как только все действия были выполнены.
+0

. Мое использование на самом деле не для параллелизма N рабочих по N ядрам, но параллелизм, когда запросы HTTP могут занимать произвольное количество времени для завершения и оставить процессор не работает в ожидании сети. Соответствует ли 'parallel-io' для последнего случая? – dan

+0

@dan Да, «Пул» определяет количество рабочих работников _m_, а затем обрабатывает заданный список _n_ задач, так что каждый раз, когда _m_ работники выполняются. –

+0

Если я использую parallel-io, мне всегда приходится компилироваться с дополнительными флагами '-threaded and supply + RTS -N2 -RTS', чтобы заставить его работать? Это кажется немного раздражающим. – dan

1

Вы можете использовать monad-par для него, который, как async, сделан Саймоном Марлоу.

Пример:

import Control.Concurrent (threadDelay) 
import Control.Monad.IO.Class (liftIO) 
import Control.Monad.Par.Combinator (parMapM) 
import Control.Monad.Par.IO (runParIO) 

download :: Int -> IO String 
download i = do 
    putStrLn ("downloading " ++ show i) 
    threadDelay 1000000 -- sleep 1 second 
    putStrLn ("downloading " ++ show i ++ " finished") 
    return "fake response" 


main :: IO() 
main = do 
    -- "pooled" mapM 
    responses <- runParIO $ parMapM (\i -> liftIO $ download i) [1..10] 
    mapM_ putStrLn responses 

компилировать с ghc --make -threaded PooledMapM.hs, работать как ./PooledMapM +RTS -N2.

Вы увидите результат:

downloading 10 
downloading 9 
downloading 9 finished 
downloading 10 finished 
downloading 8 
downloading 7 
downloading 8 finished 
downloading 7 finished 
downloading 6 
downloading 5 
downloading 6 finished 
downloading 5 finished 
downloading 4 
downloading 3 
downloading 4 finished 
downloading 3 finished 
downloading 2 
downloading 1 
downloading 2 finished 
downloading 1 finished 
fake response 
fake response 
fake response 
fake response 
fake response 
fake response 
fake response 
fake response 
fake response 
fake response 

код также можно получить через https://gist.github.com/nh2/bfa3f182da9d13baa536

+0

Можете ли вы контролировать количество параллелизма чем-то другим, чем параметр RTS? – phadej

+0

@phadej В этом случае ответ Петра с 'parallel-io' кажется хорошим. – nh2