2016-10-04 3 views
2

Я пишу функцию потоковой передачи с экосистемой труб и, в частности, с параллельной связью труб, которая основана на операционной библиотеке, что позволяет мне быстро делать небольшие фрагменты программы, которые я получаю команды серверу по сети или команде stdin/out из командной оболочки, а затем считывать ответ. В этом случае это звездочка, но ее можно обобщить как нечто похожее.Соединительные трубы с потребителями и производителями, которые возвращают разные значения

Первоначально я написал это с трубками, но это не сработает. Причина, по которой следующий код не работает, заключается в том, что astPipe возвращает Pipe _ _ IO a, тогда как как i, так и o, из pipe-concurrency возвращаются Consumer/Producer _ IO(). Я думал о том, что astPipe дает Maybe ByteString, а затем делает вывод Consumer потребляет Maybe ByteString, но это все еще не решает проблему Producer, возвращающей ().

Я чувствую, что я действительно близок к решению, но я не могу его полностью исправить. Вы должны иметь возможность просто запускать стек для этого файла для репликации.

#!/usr/bin/env stack 
-- stack --resolver lts-6.20 runghc --package pipes --package pipes-concurrency --package operational --package process-streaming 

{-# LANGUAGE OverloadedStrings, LambdaCase #-} 
{-# LANGUAGE ScopedTypeVariables #-} 
{-# LANGUAGE GADTs #-} 
module West.Asterisk where 

import System.Process.Streaming as PS 
import Control.Monad.Operational as Op 

import Pipes as P 
import Pipes.Concurrent as PC; 

import qualified Data.ByteString.Char8 as B 

import Control.Concurrent.Async 

import GHC.IO.Exception (ExitCode) 

data Version = Version String 
data Channels = Channels 

data AsteriskInstruction a where 
    Login :: AsteriskInstruction (Maybe Version) 
    CoreShowChannels :: AsteriskInstruction (Maybe Channels) 

type Asterisk a = Program AsteriskInstruction a 

runAsterisk :: forall a. Asterisk a -> IO a 
runAsterisk m = 
    let 

    runAsterisk' :: Producer B.ByteString {- TODO Response -} IO() -> Consumer B.ByteString IO() -> Asterisk a -> IO a 
    runAsterisk' i o m' = runEffect $ i >-> astPipe m' >-> o 
     where 
     astPipe :: Asterisk a -> Pipe B.ByteString B.ByteString IO a 
     astPipe k = 
      case Op.view m' of 

      Return a -> return a 

      Login :>>= k -> do 
       yield logincmd 
       resp <- await -- :: Response 
       let v = undefined resp :: Maybe Version 
       astPipe (k v) 

      CoreShowChannels :>>= k -> do 
       yield coreshowchannelscmd 
       resp <- await 
       let c = undefined resp :: Maybe Channels 
       astPipe (k c) 

    in do 
    withSpawn unbounded $ \(out1, in1) -> do 
     async $ asteriskManager (fromInput in1) (toOutput out1) 
     runAsterisk' (fromInput in1) (toOutput out1) m 

asteriskManager :: Producer B.ByteString IO() -> Consumer B.ByteString IO() -> IO ExitCode 
asteriskManager prod cons = do 
    let ssh = shell "nc someserver 5038" 
    execute (piped ssh) (foldOut (withConsumer cons) *> feedProducer prod *> exitCode) 


logincmd, coreshowchannelscmd :: B.ByteString 
logincmd = "action: login\nusername: username\nsecret: pass\nevents: off\n\n" 
coreshowchannelscmd = "action: coreshowchannels\n\n" 

Ошибка:

Blah.hs:38:45: 
    Couldn't match type ‘a’ with ‘()’ 
     ‘a’ is a rigid type variable bound by 
      the type signature for runAsterisk :: Asterisk a -> IO a 
      at Blah.hs:33:23 
    Expected type: Proxy() B.ByteString() B.ByteString IO() 
     Actual type: Pipe B.ByteString B.ByteString IO a 
    Relevant bindings include 
     astPipe :: Asterisk a -> Pipe B.ByteString B.ByteString IO a 
     (bound at Blah.hs:41:9) 
     m' :: Asterisk a (bound at Blah.hs:38:22) 
     runAsterisk' :: Producer B.ByteString IO() 
         -> Consumer B.ByteString IO() -> Asterisk a -> IO a 
     (bound at Blah.hs:38:5) 
     m :: Asterisk a (bound at Blah.hs:34:13) 
     runAsterisk :: Asterisk a -> IO a (bound at Blah.hs:34:1) 
    In the second argument of ‘(>->)’, namely ‘astPipe m'’ 
    In the first argument of ‘(>->)’, namely ‘i >-> astPipe m'’ 
+0

Похоже, вы могли бы исправить вашу подпись типа, и было бы лучше. У вас есть 'Consumer B.ByteString IO()' в сигнатуре типа вместо 'Consumer B.ByteString IO a'. «RunEffect» возвращает все, что возвращает ваш общий «Эффект», и вы сказали ему, что он должен быть «Consumer B.ByteString IO()», но «runEffect» должен вернуть «IO a». – bheklilr

+0

Это не работает, потому что http://hackage.haskell.org/package/pipes-concurrency-2.0.6/docs/Pipes-Concurrent.html#v:fromInput явно возвращает 'Producer 'am()' .'toOutput 'похоже. Я думал о попытке их каким-то образом преобразовать, но я не уверен, что это имеет смысл. –

ответ

2

Producer s и Consumer s, возвращающие () могут остановить сами по себе. Producer s и Consumer s, которые являются полиморфными по типу возврата, никогда не останавливаются сами по себе.

Чтобы унифицировать типы возврата в вашем случае, поместите каждый в разные ветви Either, используя fmap.

runAsterisk' :: Producer B.ByteString IO() 
      -> Consumer B.ByteString IO() 
      -> Asterisk a 
      -> IO (Either() a) 
runAsterisk' i o m' = runEffect $ fmap Left i >-> fmap Right (astPipe m') >-> fmap Left o 

шаблон соответствия на Either покажет, какой компонент остановил конвейер.

Кроме того, вы можете использовать drain превратить Consumer a IO() в потребитель, который никогда не останавливается сам по себе: получили

neverStop :: Consumer a IO() -> Consumer a IO r 
neverStop consumer = consumer *> drain 

Всех входов после первоначальных потребительских остановки будут отброшены.

+0

Это именно то, на что я надеялся. Бонус: я могу сказать, что моя программа вышла из строя, или если сервер неожиданно отключил меня. –

Смежные вопросы