为什么Monitor.PulseAll会在信号线程中产生“阶梯式”延迟模式?

在一个使用Monitor.PulseAll()进行线程同步的库中,我注意到从调用PulseAll(…)的时间到线程被唤醒的时间似乎都遵循着“阶梯”分布 – 大步骤。 清醒的线程几乎没有工作; 几乎立即回到监视器上等待。 例如,在12个监视器上有24个线程的盒子(2个Xeon5680 / Gulftown;每个处理器6个物理内核;禁用HT),Pulse和一个线程唤醒之间的延迟是这样的:

使用Monitor.PulseAll()的延迟;第三方图书馆

前12个线程(注意我们有12个内核)需要30到60微秒来响应。 然后我们开始变得非常大跳跃; 高原大约在700,1300,1900和2600微秒。

使用下面的代码,我能够独立于第三方库成功重新创build此行为。 这段代码所做的是启动大量的线程(改变numThreads参数),这些线程只是在监视器上等待,读取一个时间戳,logging到一个ConcurrentSet,然后立即返回到Waiting。 一旦第二个PulseAll()唤醒了所有的线程。 它执行了这20次,并将第10次迭代的延迟报告给控制台。

using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Collections.Concurrent; using System.Diagnostics; namespace PulseAllTest { class Program { static long LastTimestamp; static long Iteration; static object SyncObj = new object(); static Stopwatch s = new Stopwatch(); static ConcurrentBag<Tuple<long, long>> IterationToTicks = new ConcurrentBag<Tuple<long, long>>(); static void Main(string[] args) { long numThreads = 32; for (int i = 0; i < numThreads; ++i) { Task.Factory.StartNew(ReadLastTimestampAndPublish, TaskCreationOptions.LongRunning); } s.Start(); for (int i = 0; i < 20; ++i) { lock (SyncObj) { ++Iteration; LastTimestamp = s.Elapsed.Ticks; Monitor.PulseAll(SyncObj); } Thread.Sleep(TimeSpan.FromSeconds(1)); } Console.WriteLine(String.Join("\n", from n in IterationToTicks where n.Item1 == 10 orderby n.Item2 select ((decimal)n.Item2)/TimeSpan.TicksPerMillisecond)); Console.Read(); } static void ReadLastTimestampAndPublish() { while(true) { lock(SyncObj) { Monitor.Wait(SyncObj); } IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp)); } } } } 

使用上面的代码,下面是启用8核/瓦超线程(即任务pipe理器中的16个内核)和32个线程(* 2个Xeon5550 / Gainestown;每个处理器4个物理内核; HT启用)的盒子上的延迟的示例:

使用Monitor.PulseAll()的延迟,示例代码

编辑:试图把NUMA从等式,下面是一个graphics运行示例程序与Core i7-3770(常青藤桥)上的16个线程; 4个物理核心; HT启用:

使用Monitor.PulseAll()的延迟,示例代码,没有NUMA

任何人都可以解释为什么Monitor.PulseAll()这样的行为?

EDIT2:

为了试图certificate这种行为不是同时唤醒一堆线程所固有的,我已经使用Events复制了testing程序的行为; 而不是测量PulseAll()的延迟我正在测量ManualResetEvent.Set()的延迟。 该代码正在创build多个工作线程,然后等待同一个ManualResetEvent对象上的ManualResetEvent.Set()事件。 当事件被触发时,他们进行延迟测量,然后立即等待他们自己的每个线程AutoResetEvent。 在下一次迭代之前(500ms之前),ManualResetEvent是Reset(),然后每个AutoResetEvent都是Set(),这样线程就可以返回等待共享的ManualResetEvent了。

我犹豫发布这个,因为它可能是一个巨大的红色听证会(我没有声称事件和监视器行为类似)加上它使用一些绝对可怕的做法,让一个事件行为像一个监视器(我喜欢/讨厌看看我的如果我把这个提交给代码审查,同事会这样做); 但我认为结果是有启发性的。

这个testing是在原始testing的同一台机器上完成的; 一个2xXeon5680 / Gulftown; 每个处理器6个核心(共12个核心); 超线程被禁用。

ManualResetEventLatency

如果不明显,这是Monitor.PulseAll完全不同; 这里是覆盖在最后一个图表上的第一个graphics:

ManualResetEventLatency与监视器延迟

用于生成这些测量的代码如下所示:

 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Collections.Concurrent; using System.Diagnostics; namespace MRETest { class Program { static long LastTimestamp; static long Iteration; static ManualResetEventSlim MRES = new ManualResetEventSlim(false); static List<ReadLastTimestampAndPublish> Publishers = new List<ReadLastTimestampAndPublish>(); static Stopwatch s = new Stopwatch(); static ConcurrentBag<Tuple<long, long>> IterationToTicks = new ConcurrentBag<Tuple<long, long>>(); static void Main(string[] args) { long numThreads = 24; s.Start(); for (int i = 0; i < numThreads; ++i) { AutoResetEvent ares = new AutoResetEvent(false); ReadLastTimestampAndPublish spinner = new ReadLastTimestampAndPublish( new AutoResetEvent(false)); Task.Factory.StartNew(spinner.Spin, TaskCreationOptions.LongRunning); Publishers.Add(spinner); } for (int i = 0; i < 20; ++i) { ++Iteration; LastTimestamp = s.Elapsed.Ticks; MRES.Set(); Thread.Sleep(500); MRES.Reset(); foreach (ReadLastTimestampAndPublish publisher in Publishers) { publisher.ARES.Set(); } Thread.Sleep(500); } Console.WriteLine(String.Join("\n", from n in IterationToTicks where n.Item1 == 10 orderby n.Item2 select ((decimal)n.Item2) / TimeSpan.TicksPerMillisecond)); Console.Read(); } class ReadLastTimestampAndPublish { public AutoResetEvent ARES { get; private set; } public ReadLastTimestampAndPublish(AutoResetEvent ares) { this.ARES = ares; } public void Spin() { while (true) { MRES.Wait(); IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp)); ARES.WaitOne(); } } } } } 

这些版本之间的一个区别是,在PulseAll情况下 – 线程立即重复循环,再次locking对象。

你有12个核心,所以12个线程正在运行,执行循环,再次进入循环,locking对象(一个接一个),然后进入等待状态。 所有的时间其他线程等待。 在ManualEvent的情况下,你有两个事件,所以线程不会立即重复循环,而是在ARES事件上被阻塞 – 这就允许其他线程更快地locking拥有者。

我在PulseAll中通过在ReadLastTimestampAndPublish循环结尾join睡眠来模拟类似的行为。 这让其他线程更快地lockingsyncObj,并且似乎改善了我从程序中获得的数字。

 static void ReadLastTimestampAndPublish() { while(true) { lock(SyncObj) { Monitor.Wait(SyncObj); } IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp)); Thread.Sleep(TimeSpan.FromMilliseconds(100)); // <=== } } 

首先,这不是一个答案,只是我看着SSCLI的笔记,找出到底发生了什么。 大部分这是远远高于我的头,但有趣的。

在兔子洞下的旅程开始与Monitor.PulseAll ,这是用C#实现的调用:

clr\src\bcl\system\threading\monitor.cs

 namespace System.Threading { public static class Monitor { // other methods omitted [MethodImplAttribute(MethodImplOptions.InternalCall)] private static extern void ObjPulseAll(Object obj); public static void PulseAll(Object obj) { if (obj==null) { throw new ArgumentNullException("obj"); } ObjPulseAll(obj); } } } 

InternalCall方法被路由到clr\src\vm\ecall.cpp

 FCFuncStart(gMonitorFuncs) FCFuncElement("Enter", JIT_MonEnter) FCFuncElement("Exit", JIT_MonExit) FCFuncElement("TryEnterTimeout", JIT_MonTryEnter) FCFuncElement("ObjWait", ObjectNative::WaitTimeout) FCFuncElement("ObjPulse", ObjectNative::Pulse) FCFuncElement("ObjPulseAll", ObjectNative::PulseAll) FCFuncElement("ReliableEnter", JIT_MonReliableEnter) FCFuncEnd() 

ObjectNative生活在clr\src\vm\comobject.cpp

 FCIMPL1(void, ObjectNative::PulseAll, Object* pThisUNSAFE) { CONTRACTL { MODE_COOPERATIVE; DISABLED(GC_TRIGGERS); // can't use this in an FCALL because we're in forbid gc mode until we setup a H_M_F. THROWS; SO_TOLERANT; } CONTRACTL_END; OBJECTREF pThis = (OBJECTREF) pThisUNSAFE; HELPER_METHOD_FRAME_BEGIN_1(pThis); //-[autocvtpro]------------------------------------------------------- if (pThis == NULL) COMPlusThrow(kNullReferenceException, L"NullReference_This"); pThis->PulseAll(); //-[autocvtepi]------------------------------------------------------- HELPER_METHOD_FRAME_END(); } FCIMPLEND 

OBJECTREF是一些神奇的东西洒在Object之上( ->运算符被重载),所以OBJECTREF->PulseAll()实际上是Object->PulseAll() ,它在clr\src\vm\object.h ,只是将调用ObjHeader->PulseAll

 class Object { // snip public: // snip ObjHeader *GetHeader() { LEAF_CONTRACT; return PTR_ObjHeader(PTR_HOST_TO_TADDR(this) - sizeof(ObjHeader)); } // snip void PulseAll() { WRAPPER_CONTRACT; GetHeader()->PulseAll(); } // snip } 

ObjHeader::PulseAll获取SyncBlock ,它使用AwareLockEnterExit对象上的锁。 AwareLockclr\src\vm\syncblk.cpp )使用创build为MonitorEventCLREvent::CreateMonitorEvent(SIZE_T) )的CLREventclr\src\vm\synch.cpp ),该调用调用UnsafeCreateEventclr\src\inc\unsafe.h )或主机环境的同步方法。

clr\src\vm\syncblk.cpp

 void ObjHeader::PulseAll() { CONTRACTL { INSTANCE_CHECK; THROWS; GC_TRIGGERS; MODE_ANY; INJECT_FAULT(COMPlusThrowOM();); } CONTRACTL_END; // The following code may cause GC, so we must fetch the sync block from // the object now in case it moves. SyncBlock *pSB = GetBaseObject()->GetSyncBlock(); // GetSyncBlock throws on failure _ASSERTE(pSB != NULL); // make sure we own the crst if (!pSB->DoesCurrentThreadOwnMonitor()) COMPlusThrow(kSynchronizationLockException); pSB->PulseAll(); } void SyncBlock::PulseAll() { CONTRACTL { INSTANCE_CHECK; NOTHROW; GC_NOTRIGGER; MODE_ANY; } CONTRACTL_END; WaitEventLink *pWaitEventLink; while ((pWaitEventLink = ThreadQueue::DequeueThread(this)) != NULL) pWaitEventLink->m_EventWait->Set(); } 

DequeueThread使用crstclr\src\vm\crst.cpp ),它是关键部分的包装。 m_EventWait是一个手动CLREvent

所以,所有这些都是使用操作系统的原语,除非默认的托pipe服务提供商是重写的东西。