2016-08-07 2 views
2

Как бы вы собираете результаты списка Async a в Haskell как они становятся доступными? Идея состоит в том, чтобы начать обработку результатов асинхронных задач, как только они будут доступны.Сбор результатов ASync как они становятся доступными

Лучшее, что я мог придумать следующая функция:

collect :: [Async a] -> IO [a] 
collect [] = return [] 
collect asyncs = do 
    (a, r) <- waitAny asyncs 
    rs <- collect (filter (/= a) asyncs) 
    return (r:rs) 

Однако эта функция не проявляет желаемое поведение, поскольку, как было указано в комментарии ниже, не возвращается, пока все асинхронные задачи завершены. Кроме того, collect работает в O(n^2), так как я фильтрую список на каждом рекурсивном шаге. Это может быть улучшено за счет использования более эффективной структуры (и, возможно, индексации позиции значений Async в списке).

Возможно, есть функции библиотеки, которые позаботились об этом, но я не смог найти их в модуле Control.Concurrent.Async, и мне интересно, почему.

EDIT: после того, как немного более тщательно думать проблему, я задаюсь вопросом, такая функция является ли хорошей идеей. Я мог бы просто использовать fmap для асинхронных задач. Может быть, лучше подождать результатов, когда нет другого выбора.

+2

Ваш 'collect' не возвращается, пока все' Async 'в списке не будут выполнены. Это то, что вам нужно? –

+0

Нет. Я этого не осознавал. Моя реализация действительно ошибается. Моя идея заключалась в том, что «collect» возвращает результат, как только асинхронный поток будет готов. Я отредактирую свой ответ соответственно. –

ответ

1

Как я уже упоминал в my other answer, потоковые результаты из списка Async s, поскольку они становятся доступными, достигаются с использованием библиотеки обработки потока. Вот пример, используя pipes.

import Control.Concurrent (threadDelay) 
import Control.Concurrent.Async 
import Control.Concurrent.STM 
import Data.Functor (($>)) 
import Pipes 
import Pipes.Concurrent -- from the pipes-concurrency package 
import qualified Pipes.Prelude as P 


asCompleted :: MonadIO m => [Async a] -> Producer a m() 
asCompleted asyncs = do 
    (o, i, seal) <- liftIO $ spawn' unbounded 
    liftIO $ forkIO $ do 
     forConcurrently asyncs (\async -> atomically $ waitSTM async >>= send o) 
     atomically seal 
    fromInput i 

main = do 
    actions <- traverse async [threadDelay 2000000 $> "bar", threadDelay 1000000 $> "foo"] 
    runEffect $ asCompleted actions >-> P.print 
-- after one second, prints "foo", then "bar" a second later 

Использование pipes-concurrency мы spawn'Output - Input пара и немедленно преобразовать Input к Producer с помощью fromInput. Асинхронно мы получаем send элементов по мере их появления. Когда все Async s завершены, мы входим, чтобы закрыть Producer.

0

Я читаю ваш вопрос как «можно ли отсортировать список Async по времени их завершения?». Если это то, что вы имели в виду, ответ да.

import Control.Applicative (liftA2) 
import Control.Concurrent (threadDelay) 
import Control.Concurrent.Async 
import Data.Functor (($>)) 
import Data.List (sortBy) 
import Data.Ord (comparing) 
import Data.Time (getCurrentTime) 


sortByCompletion :: [Async a] -> IO [a] 
sortByCompletion = fmap (fmap fst . sortBy (comparing snd)) . mapConcurrently withCompletionTime 
    where withCompletionTime async = liftA2 (,) (wait async) getCurrentTime 

main = do 
    asyncs <- traverse async [threadDelay 2000000 $> "bar", threadDelay 1000000 $> "foo"] 
    sortByCompletion asyncs 
-- ["foo", "bar"], after two seconds 

Использование mapConcurrently мы ждем каждого Async в отдельном потоке. По завершении мы получаем текущее время - время завершения Async - и используем его для сортировки результатов. Это сложность O (n log n), потому что мы сортируем список. (Ваш оригинальный алгоритм был эффективно selection sort.)

Как ваши collect, sortByCompletion не возвращается, пока все Async s в списке не завершена. Если вы хотите получить поток результатов на основную тему, так как они становятся доступными, ну, списки не очень хороший инструмент для этого. Я бы использовал поточную абстракцию, такую ​​как conduit или pipes, или, работая на более низком уровне, TQueue. См. Пример my other answer.

+0

Я не хочу сортировать значения «Async» по времени завершения, но иметь возможность начать обработку, как только результат станет доступным. Спасибо за указание на это. Я добавлю это разъяснение к моему вопросу. –

1

Осуществляется с помощью TChan, дополнительно реализована версия, которая может реагировать немедленно, но он более сложен, а также могут возникнуть проблемы с исключениями (если вы хотите получить исключения, используйте SlaveThread.fork вместо forkIO), так что я заметил, что код Если вы не заинтересованы в нем:

import   Control.Concurrent  (threadDelay) 
import   Control.Concurrent  (forkIO) 
import   Control.Concurrent.Async 
import   Control.Concurrent.STM 
import   Control.Monad 

collect :: [Async a] -> IO [a] 
collect = atomically . collectSTM 

collectSTM :: [Async a] -> STM [a] 
collectSTM as = do 
    c <- newTChan 
    collectSTMChan c as 

collectSTMChan :: TChan a -> [Async a] -> STM [a] 
collectSTMChan chan as = do 
    mapM_ (waitSTM >=> writeTChan chan) as 
    replicateM (length as) (readTChan chan) 

main :: IO() 
main = do 
    a1 <- async (threadDelay 2000000 >> putStrLn "slept 2 secs" >> return 2) 
    a2 <- async (threadDelay 3000000 >> putStrLn "slept 3 secs" >> return 3) 
    a3 <- async (threadDelay 1000000 >> putStrLn "slept 1 sec" >> return 1) 
    res <- collect [a1,a2,a3] 
    putStrLn (show res) 

    -- -- reacting immediately 
    -- a1 <- async (threadDelay 2000000 >> putStrLn "slept 2 secs" >> return 2) 
    -- a2 <- async (threadDelay 3000000 >> putStrLn "slept 3 secs" >> return 3) 
    -- a3 <- async (threadDelay 1000000 >> putStrLn "slept 1 sec" >> return 1) 
    -- c <- collectChan [a1,a2,a3] 
    -- replicateM_ 3 (atomically (readTChan c) >>= \v -> putStrLn ("Received: " ++ show v)) 

-- collectChan :: [Async a] -> IO (TChan a) 
-- collectChan as = do 
--  c <- newTChanIO 
--  forM_ as $ \a -> forkIO ((atomically . (waitSTM >=> writeTChan c)) a) 
--  return c 
Смежные вопросы