减lessHaskell程序中的垃圾收集暂停时间

我们正在开发一个接收和转发“消息”的程序,同时保留这些消息的临时logging,以便它可以在需要时告诉你消息历史。 消息是数字标识的,通常大小在1千字节左右,我们需要保留成千上万条消息。

我们希望优化这个程序的延迟:发送和接收消息之间的时间必须低于10毫秒。

该程序用Haskell编写,并与GHC编译。 但是,我们发现垃圾收集暂停对我们的延迟要求来说太长了:在我们的现实世界程序中超过100毫秒。

以下程序是我们的应用程序的简化版本。 它使用Data.Map.Strict来存储消息。 消息是由Int标识的ByteString 。 按增加的数字顺序插入1000000条消息,并且不断移除最旧的消息以保持历史最多20万条消息。

 module Main (main) where import qualified Control.Exception as Exception import qualified Control.Monad as Monad import qualified Data.ByteString as ByteString import qualified Data.Map.Strict as Map data Msg = Msg !Int !ByteString.ByteString type Chan = Map.Map Int ByteString.ByteString message :: Int -> Msg message n = Msg n (ByteString.replicate 1024 (fromIntegral n)) pushMsg :: Chan -> Msg -> IO Chan pushMsg chan (Msg msgId msgContent) = Exception.evaluate $ let inserted = Map.insert msgId msgContent chan in if 200000 < Map.size inserted then Map.deleteMin inserted else inserted main :: IO () main = Monad.foldM_ pushMsg Map.empty (map message [1..1000000]) 

我们编译并运行这个程序使用:

 $ ghc --version The Glorious Glasgow Haskell Compilation System, version 7.10.3 $ ghc -O2 -optc-O3 Main.hs $ ./Main +RTS -s 3,116,460,096 bytes allocated in the heap 385,101,600 bytes copied during GC 235,234,800 bytes maximum residency (14 sample(s)) 124,137,808 bytes maximum slop 600 MB total memory in use (0 MB lost due to fragmentation) Tot time (elapsed) Avg pause Max pause Gen 0 6558 colls, 0 par 0.238s 0.280s 0.0000s 0.0012s Gen 1 14 colls, 0 par 0.179s 0.250s 0.0179s 0.0515s INIT time 0.000s ( 0.000s elapsed) MUT time 0.652s ( 0.745s elapsed) GC time 0.417s ( 0.530s elapsed) EXIT time 0.010s ( 0.052s elapsed) Total time 1.079s ( 1.326s elapsed) %GC time 38.6% (40.0% elapsed) Alloc rate 4,780,213,353 bytes per MUT second Productivity 61.4% of total user, 49.9% of total elapsed 

这里的重要指标是0.0515秒或51毫秒的“最大暂停”。 我们希望减less至less一个数量级。

实验表明,GC暂停的长度取决于历史logging中的消息数量。 这种关系大致是线性的,也可能是超线性的。 下表显示了这种关系。 ( 您可以在这里看到我们的基准testing ,以及一些图表 。)

 msgs history length max GC pause (ms) =================== ================= 12500 3 25000 6 50000 13 100000 30 200000 56 400000 104 800000 199 1600000 487 3200000 1957 6400000 5378 

我们已经尝试了其他几个variables来找出它们是否可以减less这个延迟,但是没有哪个变化很大。 其中不重要的variables有:优化( -O-O2 ); RTS GC选项( -G-H-A-c ),核心数量( -N ),不同的数据结构( Data.Sequence ),消息的大小以及生成的短期垃圾的数量。 压倒性的决定因素是历史上的消息数量。

我们的工作理论是消息的数量是暂停的,因为每个GC周期必须遍历所有可用的存储器并复制它,这显然是线性操作。

问题:

  • 这个线性时间理论是否正确? GC暂停的长度可以用这种简单的方式来表示,还是现实更复杂?
  • 如果在工作记忆中GC暂停是线性的,有什么办法可以减less所涉及的常数因子?
  • 有增量GC的任何选项,或类似的东西? 我们只能看研究论文。 我们非常愿意以较低的延迟交易吞吐量。
  • 除了分成多个进程之外,是否有任何方法可以为更小的GC循环“分配”内存?

你实际上做得很好,有51ms的暂停时间,超过200Mb的实时数据。 我所使用的系统的最大暂停时间是一半的实时数据量。

你的假设是正确的,主要的GC暂停时间与实时数据量成正比,不幸的是GHC现在还没有办法解决这个问题。 我们过去是用增量GC进行实验,但是这是一个研究项目,没有达到将其折叠到释放的GHC所需的成熟度水平。

有一件事,我们希望将在未来帮助这个紧凑的地区: https : //phabricator.haskell.org/D1264 。 这是一种手动内存pipe理,您可以在堆中压缩结构,GC不必遍历它。 它对于长时间的数据效果最好,但也许在设置中使用单个消息就足够了。 我们的目标是在GHC 8.2.0中。

如果您处于分布式设置并且有某种types的负载平衡器,您可以使用这些技巧来避免暂停命中,您基本上确保负载平衡器不会将请求发送到即将到来的计算机做一个主要的GC,当然确保机器仍然完成GC,即使它没有得到请求。

我已经使用IOVector作为底层数据结构,使用了环形缓冲区方法尝试了您的代码片段。 在我的系统上(GHC 7.10.3,相同的编译选项),这导致最大时间(您在OP中提到的度量)减less了22%。

NB。 我在这里做了两个假设:

  1. 一个可变的数据结构是适合的问题(我猜消息传递意味着无论如何IO)
  2. 你的messageId是连续的

有了一些额外的Int参数和algorithm(就像messageId被重置为0或minBound ),直接判断一个特定的消息是否仍然在历史logging中,然后从ringbuffer中检索出相应的索引。

为了您的testing乐趣:

 import qualified Control.Exception as Exception import qualified Control.Monad as Monad import qualified Data.ByteString as ByteString import qualified Data.Map.Strict as Map import qualified Data.Vector.Mutable as Vector data Msg = Msg !Int !ByteString.ByteString type Chan = Map.Map Int ByteString.ByteString data Chan2 = Chan2 { next :: !Int , maxId :: !Int , ringBuffer :: !(Vector.IOVector ByteString.ByteString) } chanSize :: Int chanSize = 200000 message :: Int -> Msg message n = Msg n (ByteString.replicate 1024 (fromIntegral n)) newChan2 :: IO Chan2 newChan2 = Chan2 0 0 <$> Vector.unsafeNew chanSize pushMsg2 :: Chan2 -> Msg -> IO Chan2 pushMsg2 (Chan2 ix _ store) (Msg msgId msgContent) = let ix' = if ix == chanSize then 0 else ix + 1 in Vector.unsafeWrite store ix' msgContent >> return (Chan2 ix' msgId store) pushMsg :: Chan -> Msg -> IO Chan pushMsg chan (Msg msgId msgContent) = Exception.evaluate $ let inserted = Map.insert msgId msgContent chan in if chanSize < Map.size inserted then Map.deleteMin inserted else inserted main, main1, main2 :: IO () main = main2 main1 = Monad.foldM_ pushMsg Map.empty (map message [1..1000000]) main2 = newChan2 >>= \c -> Monad.foldM_ pushMsg2 c (map message [1..1000000]) 

我必须同意其他观点 – 如果你有严格的实时限制,那么使用GC语言并不理想。

但是,您可能会考虑尝试其他可用的数据结构,而不仅仅是Data.Map。

我使用Data.Sequence重写了它,并得到了一些有希望的改进:

 msgs history length max GC pause (ms) =================== ================= 12500 0.7 25000 1.4 50000 2.8 100000 5.4 200000 10.9 400000 21.8 800000 46 1600000 87 3200000 175 6400000 350 

即使您正在优化延迟,我也注意到其他指标也在改善。 在200000的情况下,执行时间从1.5秒下降到0.2秒,总内存使用量从600MB下降到27MB。

我应该注意到,我通过调整devise来欺骗:

  • 我从Msg删除了Int ,所以它不在两个地方。
  • 我没有使用从IntByteString的映射,而是使用了一个ByteString Sequence ,而不是每个消息一个Int ,我认为可以用一个Int来完成整个Sequence 。 假设消息无法重新sorting,您可以使用一个偏移量来将您想要的消息转换为队列中的位置。

(我包含一个额外的函数getMsg来certificate这一点。)

 {-# LANGUAGE BangPatterns #-} import qualified Control.Exception as Exception import qualified Control.Monad as Monad import qualified Data.ByteString as ByteString import Data.Sequence as S newtype Msg = Msg ByteString.ByteString data Chan = Chan Int (Seq ByteString.ByteString) message :: Int -> Msg message n = Msg (ByteString.replicate 1024 (fromIntegral n)) maxSize :: Int maxSize = 200000 pushMsg :: Chan -> Msg -> IO Chan pushMsg (Chan !offset sq) (Msg msgContent) = Exception.evaluate $ let newSize = 1 + S.length sq newSq = sq |> msgContent in if newSize <= maxSize then Chan offset newSq else case S.viewl newSq of (_ :< newSq') -> Chan (offset+1) newSq' S.EmptyL -> error "Can't happen" getMsg :: Chan -> Int -> Maybe Msg getMsg (Chan offset sq) i_ = getMsg' (i_ - offset) where getMsg' i | i < 0 = Nothing | i >= S.length sq = Nothing | otherwise = Just (Msg (S.index sq i)) main :: IO () main = Monad.foldM_ pushMsg (Chan 0 S.empty) (map message [1..5 * maxSize]) 

那么你发现GC语言的限制:它们不适合硬核实时系统。

你有2个选项:

1st增加堆大小,并使用2级caching系统,最旧的消息被发送到磁盘,并保留最新的消息在内存中,你可以通过使用OS分页。 但是,这个解决scheme的问题在于,根据所使用的辅助存储单元的读取能力,寻呼可能是昂贵的。

第二个程序使用'C'的解决scheme,并将其与FFI连接到haskell。 这样你可以做你自己的内存pipe理。 这将是最好的select,因为你可以控制你自己需要的内存。