一个处理pipe道,2个相同types的IO源
在我的GHC Haskell
应用程序利用stm,networkingpipe道和pipe道,我有一个分支为每个套接字使用runTCPServer
自动分叉。 股可以通过使用广播TChan与其他股沟通。
这展示了我想如何build立pipe道“链”:
所以,我们这里有两个源(每个绑定到helper的pipe道),它产生一个Packet
对象, encoder
将接受并转换成ByteString
,然后发送出套接字。 对于两种投入的有效融合(性能是一个问题)我有很大的困难。
如果有人能指出我正确的方向,我将不胜感激。
既然发这个问题而不作出任何尝试是不礼貌的,我会把我以前在这里试过的东西放进去;
我写/select了一个函数(阻塞)从TMChan(可closures的通道)产生一个源。
-- | Takes a generic type of STM chan and, given read and close functionality, -- returns a conduit 'Source' which consumes the elements of the channel. chanSource :: (MonadIO m, MonadSTM m) => a -- ^ The channel -> (a -> STM (Maybe b)) -- ^ The read function -> (a -> STM ()) -- ^ The close/finalizer function -> Source mb chanSource ch readCh closeCh = ConduitM pull where close = liftSTM $ closeCh ch pull = PipeM $ liftSTM $ readCh ch >>= translate translate = return . maybe (Done ()) (HaveOutput pull close)
同样,将陈变成水槽的function也是如此。
-- | Takes a stream and, given write and close functionality, returns a sink -- which wil consume elements and broadcast them into the channel chanSink :: (MonadIO m, MonadSTM m) => a -- ^ The channel -> (a -> b -> STM()) -- ^ The write function -> (a -> STM()) -- ^ The close/finalizer function -> Sink bm () chanSink ch writeCh closeCh = ConduitM sink where close = const . liftSTM $ closeCh ch sink = NeedInput push close write = liftSTM . writeCh ch push x = PipeM $ write x >> return sink
然后mergeSources很简单; 叉2线程(我真的不想做的,但它是什么),可以把他们的新项目进入一个列表,然后我产生一个来源;
-- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns -- a source which consumes the elements of the channel. mergeSources :: (MonadIO m, MonadBaseControl IO m, MonadSTM m) => [Source (ResourceT m) a] -- ^ The list of sources -> ResourceT m (Source (ResourceT m) a) mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn where push cs = s $$ chanSink c writeTMChan closeTMChan fsrc xc = mapM_ (\s -> resourceForkIO $ push cs) x retn c = return $ chanSource c readTMChan closeTMChan
虽然我成功地完成了这些function,但我没有成功地利用这些function来进行types检查。
-- | Helper which represents a conduit chain for each client connection serverApp :: Application SessionIO serverApp appdata = do use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast -- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata mergsrc $$ protocol $= encoder =$ appSink appdata where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan mergsrc = mergeSources [appSource appdata $= decoder, chansrc] -- | Structure which holds mutable information for clients data SessionState = SessionState { _ssBroadcast :: TMChan Packet -- ^ Outbound packet broadcast channel } makeLenses ''SessionState -- | A transformer encompassing both SessionReader and SessionState type Session m = ReaderT SessionReader (StateT SessionState m) -- | Macro providing Session applied to an IO monad type SessionIO = Session IO
无论如何,我认为这种方法存在缺陷 – 有许多中间列表和转换。 这对性能不是很好。 寻求指导。
PS。 据我所知,这不是重复的; 如同我的情况一样, 将导pipe与多个input进行融合 ,只要我没有等待另一个对象准备好被消耗,那么这两个来源都会产生相同的types,我不在乎从哪个来源生成Packet
对象。
PPS。 我对示例代码中镜头的使用(以及对知识的要求)表示歉意。
我不知道是否有任何帮助,但我尝试实施Iain的build议,并制定了mergeSources'
一个变种, mergeSources'
有任何渠道:
mergeSources' :: (MonadIO m, MonadBaseControl IO m) => [Source (ResourceT m) a] -- ^ The sources to merge. -> Int -- ^ The bound of the intermediate channel. -> ResourceT m (Source (ResourceT m) a) mergeSources' sx bound = do c <- liftSTM $ newTBMChan bound mapM_ (\s -> resourceForkIO $ s $$ chanSink c writeTBMChan closeTBMChan) sx return $ sourceTBMChan c
(这个简单的添加在这里可用)。
一些对你的mergeSources
版本的mergeSources
(带一粒盐,可能是我没有很好的理解):
- 用
...TMChan
而不是...TBMChan
似乎很危险。 如果作家比读者快,你的堆就会吹。 看看你的图,看起来这很容易发生,如果你的TCP对端不够快的读取数据。 所以我肯定会用...TBMChan
,可能大但有限的界限。 -
您不需要
MonadSTM m
约束。 所有STM的东西都被封装到IO
中liftSTM = liftIO . atomically
在
serverApp
使用mergeSources'
时,这可能会稍微有所帮助。 -
只是一个美容问题,我发现
liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
由于在
(->) r
monad上使用liftA2
,所以很难阅读。 我会说do c <- liftSTM newTMChan fsrc sx c retn c
会更长,但更容易阅读。
你可能会创build一个自包含的项目,可以玩serverApp
吗?