Я пишу функцию потоковой передачи с экосистемой труб и, в частности, с параллельной связью труб, которая основана на операционной библиотеке, что позволяет мне быстро делать небольшие фрагменты программы, которые я получаю команды серверу по сети или команде stdin/out из командной оболочки, а затем считывать ответ. В этом случае это звездочка, но ее можно обобщить как нечто похожее.Соединительные трубы с потребителями и производителями, которые возвращают разные значения
Первоначально я написал это с трубками, но это не сработает. Причина, по которой следующий код не работает, заключается в том, что astPipe возвращает Pipe _ _ IO a
, тогда как как i, так и o, из pipe-concurrency возвращаются Consumer/Producer _ IO()
. Я думал о том, что astPipe
дает Maybe ByteString
, а затем делает вывод Consumer
потребляет Maybe ByteString
, но это все еще не решает проблему Producer
, возвращающей ()
.
Я чувствую, что я действительно близок к решению, но я не могу его полностью исправить. Вы должны иметь возможность просто запускать стек для этого файла для репликации.
#!/usr/bin/env stack
-- stack --resolver lts-6.20 runghc --package pipes --package pipes-concurrency --package operational --package process-streaming
{-# LANGUAGE OverloadedStrings, LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE GADTs #-}
module West.Asterisk where
import System.Process.Streaming as PS
import Control.Monad.Operational as Op
import Pipes as P
import Pipes.Concurrent as PC;
import qualified Data.ByteString.Char8 as B
import Control.Concurrent.Async
import GHC.IO.Exception (ExitCode)
data Version = Version String
data Channels = Channels
data AsteriskInstruction a where
Login :: AsteriskInstruction (Maybe Version)
CoreShowChannels :: AsteriskInstruction (Maybe Channels)
type Asterisk a = Program AsteriskInstruction a
runAsterisk :: forall a. Asterisk a -> IO a
runAsterisk m =
let
runAsterisk' :: Producer B.ByteString {- TODO Response -} IO() -> Consumer B.ByteString IO() -> Asterisk a -> IO a
runAsterisk' i o m' = runEffect $ i >-> astPipe m' >-> o
where
astPipe :: Asterisk a -> Pipe B.ByteString B.ByteString IO a
astPipe k =
case Op.view m' of
Return a -> return a
Login :>>= k -> do
yield logincmd
resp <- await -- :: Response
let v = undefined resp :: Maybe Version
astPipe (k v)
CoreShowChannels :>>= k -> do
yield coreshowchannelscmd
resp <- await
let c = undefined resp :: Maybe Channels
astPipe (k c)
in do
withSpawn unbounded $ \(out1, in1) -> do
async $ asteriskManager (fromInput in1) (toOutput out1)
runAsterisk' (fromInput in1) (toOutput out1) m
asteriskManager :: Producer B.ByteString IO() -> Consumer B.ByteString IO() -> IO ExitCode
asteriskManager prod cons = do
let ssh = shell "nc someserver 5038"
execute (piped ssh) (foldOut (withConsumer cons) *> feedProducer prod *> exitCode)
logincmd, coreshowchannelscmd :: B.ByteString
logincmd = "action: login\nusername: username\nsecret: pass\nevents: off\n\n"
coreshowchannelscmd = "action: coreshowchannels\n\n"
Ошибка:
Blah.hs:38:45:
Couldn't match type ‘a’ with ‘()’
‘a’ is a rigid type variable bound by
the type signature for runAsterisk :: Asterisk a -> IO a
at Blah.hs:33:23
Expected type: Proxy() B.ByteString() B.ByteString IO()
Actual type: Pipe B.ByteString B.ByteString IO a
Relevant bindings include
astPipe :: Asterisk a -> Pipe B.ByteString B.ByteString IO a
(bound at Blah.hs:41:9)
m' :: Asterisk a (bound at Blah.hs:38:22)
runAsterisk' :: Producer B.ByteString IO()
-> Consumer B.ByteString IO() -> Asterisk a -> IO a
(bound at Blah.hs:38:5)
m :: Asterisk a (bound at Blah.hs:34:13)
runAsterisk :: Asterisk a -> IO a (bound at Blah.hs:34:1)
In the second argument of ‘(>->)’, namely ‘astPipe m'’
In the first argument of ‘(>->)’, namely ‘i >-> astPipe m'’
Похоже, вы могли бы исправить вашу подпись типа, и было бы лучше. У вас есть 'Consumer B.ByteString IO()' в сигнатуре типа вместо 'Consumer B.ByteString IO a'. «RunEffect» возвращает все, что возвращает ваш общий «Эффект», и вы сказали ему, что он должен быть «Consumer B.ByteString IO()», но «runEffect» должен вернуть «IO a». – bheklilr
Это не работает, потому что http://hackage.haskell.org/package/pipes-concurrency-2.0.6/docs/Pipes-Concurrent.html#v:fromInput явно возвращает 'Producer 'am()' .'toOutput 'похоже. Я думал о попытке их каким-то образом преобразовать, но я не уверен, что это имеет смысл. –