2014-12-29 2 views
5

Я пытаюсь прочитать группу из 50 элементов из трубы и обрабатывать их в действии ввода-вывода одновременно. (В этом случае я пытаюсь вставить данные в базу данных, и я хочу сделать целую партию внутри одной транзакции, потому что она намного эффективнее). Вот упрощенная версия того, что у меня до сих пор:Как обнаружить конец ввода с помощью труб

type ExampleType = Int 

doSomething :: [ExampleType] -> IO() 
doSomething = undefined 

inGroupsOf50 :: Monad m => Producer ExampleType m() -> m() 
inGroupsOf50 input = 
    runEffect $ input >-> loop 
     where loop = do entries <- replicateM 50 await 
        lift $ doSomething entries --Insert a bunch all in one transaction 
        loop 

Проблема, насколько я могу судить, если количество элементов для вставки не происходит деление на 50, я буду скучать некоторые. То, что мне действительно нужно, а не replicateM 50 await - это то, что дает мне до 50 элементов или меньше, если вход заканчивается, но я не могу понять, как это записать.

Я думал, что pipes-parse может быть подходящей библиотекой для просмотра. draw имеет многообещающую подпись ... но пока все биты не подходят друг другу в моей голове. У меня есть producer, я пишу consumer, и я действительно не понимаю, как это относится к концепции parser.

ответ

11

Даже больше, чем pipes-parse, вы вполне можете взглянуть на pipes-group. В частности, рассмотрим функцию

-- this type is slightly specialized 
chunksOf 
    :: Monad m => 
    Int -> 
    Lens' (Producer a m x) (FreeT (Producer a m) m x) 

Lens' немного, возможно, страшно, но могут быть устранены быстро: он утверждает, что мы можем преобразовать Producer a m x в FreeT (Producer a m) m x [0]

import Control.Lens (view) 

chunkIt :: Monad m => Int -> Producer a m x -> FreeT (Producer a m) m x 
chunkIt n = view (chunksOf n) 

Так что теперь мы должны выяснить, что делать с этим FreeT бит. В частности, мы будем хотеть копаться в free пакет и вытащить функцию iterT

iterT 
    :: (Functor f, Monad m) => 
    (f (m a) -> m a) -> 
    (FreeT f m a -> m a) 

Эта функция, iterT, давайте нам «потреблять» в FreeT один «шаг» в то время. Чтобы понять это, мы сначала специализировать тип iterT заменить f с Producer a m

runChunk :: Monad m => 
      (Producer a m (m x)  -> m x) -> 
      (FreeT (Producer a m) m x -> m x) 
runChunk = iterT 

В частности, runChunk может «запустить» а FreeT полный Producer S так долго, как мы говорим это, как преобразовать Producer a m (m x) в m -action. Это может начать более похоже. Когда мы определяем первый аргумент runChunk, нам просто нужно выполнить команду Producer, которая в этом случае будет содержать не более выбранного количества элементов.

Но каково эффективное возвращаемое значение m x? Это «продолжение», например. все куски, которые приходят после текущий, который вы пишете. Так, например, давайте предположим, что у нас есть Producer из Char с и мы хотели бы, чтобы напечатать и LINEBREAK после 3-х символов

main :: IO() 
main = flip runChunk (chunkIt 3 input) $ \p -> _ 

_ отверстие в этой точке имеет тип IO() с p в контексте в type p :: Producer Char IO (IO()). Мы можем использовать этот канал с for, собрать его возвращаемый тип (который является продолжением, снова), испустить новую строку, а затем запустить продолжение.

input :: Monad m => Producer Char m() 
input = each "abcdefghijklmnopqrstuvwxyz" 

main :: IO() 
main = flip runChunk (chunkIt 3 input) $ \p -> do 
    cont <- runEffect $ for p (lift . putChar) 
    putChar '\n' 
    cont 

И это ведет себя точно так, как требуется

λ> main 
abc 
def 
ghi 
jkl 
mno 
pqr 
stu 
vwx 
yz 

Чтобы было ясно, в то время как я сделал немного изложения, это довольно простой код, как только вы видите, как все части подходят друг к другу. Вот полный список:

input :: Monad m => Producer Char m() 
input = each "abcdefghijklmnopqrstuvwxyz" 

main :: IO() 
main = flip iterT (input ^. chunksOf 3) $ \p -> do 
    cont <- runEffect $ for p $ \c -> do 
    lift (putChar c) 
    putChar '\n' 
    cont 

[0] Также немного больше, но этого пока достаточно.

Смежные вопросы