一个处理pipe道,2个相同types的IO源

在我的GHC Haskell应用程序利用stm,networkingpipe道和pipe道,我有一个分支为每个套接字使用runTCPServer自动分叉。 股可以通过使用广播TChan与其他股沟通。

这展示了我想如何build立pipe道“链”:

在这里输入图像说明

所以,我们这里有两个源(每个绑定到helper的pipe道),它产生一个Packet对象, encoder将接受并转换成ByteString ,然后发送出套接字。 对于两种投入的有效融合(性能是一个问题)我有很大的困难。

如果有人能指出我正确的方向,我将不胜感激。


既然发这个问题而不作出任何尝试是不礼貌的,我会把我以前在这里试过的东西放进去;

我写/select了一个函数(阻塞)从TMChan(可closures的通道)产生一个源。

 -- | Takes a generic type of STM chan and, given read and close functionality, -- returns a conduit 'Source' which consumes the elements of the channel. chanSource :: (MonadIO m, MonadSTM m) => a -- ^ The channel -> (a -> STM (Maybe b)) -- ^ The read function -> (a -> STM ()) -- ^ The close/finalizer function -> Source mb chanSource ch readCh closeCh = ConduitM pull where close = liftSTM $ closeCh ch pull = PipeM $ liftSTM $ readCh ch >>= translate translate = return . maybe (Done ()) (HaveOutput pull close) 

同样,将陈变成水槽的function也是如此。

 -- | Takes a stream and, given write and close functionality, returns a sink -- which wil consume elements and broadcast them into the channel chanSink :: (MonadIO m, MonadSTM m) => a -- ^ The channel -> (a -> b -> STM()) -- ^ The write function -> (a -> STM()) -- ^ The close/finalizer function -> Sink bm () chanSink ch writeCh closeCh = ConduitM sink where close = const . liftSTM $ closeCh ch sink = NeedInput push close write = liftSTM . writeCh ch push x = PipeM $ write x >> return sink 

然后mergeSources很简单; 叉2线程(我真的不想做的,但它是什么),可以把他们的新项目进入一个列表,然后我产生一个来源;

 -- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns -- a source which consumes the elements of the channel. mergeSources :: (MonadIO m, MonadBaseControl IO m, MonadSTM m) => [Source (ResourceT m) a] -- ^ The list of sources -> ResourceT m (Source (ResourceT m) a) mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn where push cs = s $$ chanSink c writeTMChan closeTMChan fsrc xc = mapM_ (\s -> resourceForkIO $ push cs) x retn c = return $ chanSource c readTMChan closeTMChan 

虽然我成功地完成了这些function,但我没有成功地利用这些function来进行types检查。

 -- | Helper which represents a conduit chain for each client connection serverApp :: Application SessionIO serverApp appdata = do use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast -- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata mergsrc $$ protocol $= encoder =$ appSink appdata where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan mergsrc = mergeSources [appSource appdata $= decoder, chansrc] -- | Structure which holds mutable information for clients data SessionState = SessionState { _ssBroadcast :: TMChan Packet -- ^ Outbound packet broadcast channel } makeLenses ''SessionState -- | A transformer encompassing both SessionReader and SessionState type Session m = ReaderT SessionReader (StateT SessionState m) -- | Macro providing Session applied to an IO monad type SessionIO = Session IO 

无论如何,我认为这种方法存在缺陷 – 有许多中间列表和转换。 这对性能不是很好。 寻求指导。


PS。 据我所知,这不是重复的; 如同我的情况一样, 将导pipe与多个input进行融合 ,只要我没有等待另一个对象准备好被消耗,那么这两个来源都会产生相同的types,我不在乎从哪个来源生成Packet对象。

PPS。 我对示例代码中镜头的使用(以及对知识的要求)表示歉意。

我不知道是否有任何帮助,但我尝试实施Iain的build议,并制定了mergeSources'一个变种, mergeSources'有任何渠道:

 mergeSources' :: (MonadIO m, MonadBaseControl IO m) => [Source (ResourceT m) a] -- ^ The sources to merge. -> Int -- ^ The bound of the intermediate channel. -> ResourceT m (Source (ResourceT m) a) mergeSources' sx bound = do c <- liftSTM $ newTBMChan bound mapM_ (\s -> resourceForkIO $ s $$ chanSink c writeTBMChan closeTBMChan) sx return $ sourceTBMChan c 

(这个简单的添加在这里可用)。

一些对你的mergeSources版本的mergeSources (带一粒盐,可能是我没有很好的理解):

  • ...TMChan而不是...TBMChan似乎很危险。 如果作家比读者快,你的堆就会吹。 看看你的图,看起来这很容易发生,如果你的TCP对端不够快的读取数据。 所以我肯定会用...TBMChan ,可能大但有限的界限。
  • 您不需要MonadSTM m约束。 所有STM的东西都被封装到IO

     liftSTM = liftIO . atomically 

    serverApp使用mergeSources'时,这可能会稍微有所帮助。

  • 只是一个美容问题,我发现

     liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn 

    由于在(->) r monad上使用liftA2 ,所以很难阅读。 我会说

     do c <- liftSTM newTMChan fsrc sx c retn c 

    会更长,但更容易阅读。

你可能会创build一个自包含的项目,可以玩serverApp吗?