StaTaskScheduler和STA线程消息抽取

TL; DR: StaTaskScheduler运行的任务内的死锁。 长版本:

我使用Parallel Team的ParallelExtensionsExtras中的StaTaskScheduler来托pipe由第三方提供的一些传统的STA COM对象。 StaTaskScheduler实现细节的描述如下:

好消息是TPL的实现能够在MTA或者STA线程上运行,并且考虑到WaitHandle.WaitAll(当方法提供了多个等待句柄时只支持MTA线程)等底层API的相关差异。

我认为这意味着TPL的阻塞部分将使用一个等待API的消息,如CoWaitForMultipleHandles ,以避免在STA线程上调用死锁情况。

在我的情况下,我相信以下情况正在发生:进程内的STA COM对象A调用了对象之外的对象B,然后期望从B通过作为传出呼叫的一部分的callback。

简化forms:

 var result = await Task.Factory.StartNew(() => { // in-proc object A var a = new A(); // out-of-proc object B var b = new B(); // A calls B and B calls back A during the Method call return a.Method(b); }, CancellationToken.None, TaskCreationOptions.None, staTaskScheduler); 

问题是, a.Method(b)永远不会返回。 据我所知,发生这种情况是因为阻塞在BlockingCollection<Task>内部的某个地方不会泵送消息,所以我对引用语句的假设可能是错误的。

EDITED相同的代码在testingWinForms应用程序的UI线程上执行时(即,将TaskScheduler.FromCurrentSynchronizationContext()而不是staTaskSchedulerTask.Factory.StartNew )。

什么是解决这个问题的正确方法? 我是否应该实现一个自定义同步上下文,它将显式地用CoWaitForMultipleHandles抽取消息,并将其安装在由StaTaskScheduler启动的每个STA线程上?

如果是这样, BlockingCollection的底层实现是否会调用我的SynchronizationContext.Wait方法? 我可以使用SynchronizationContext.WaitHelper来实现SynchronizationContext.Wait吗?


用一些代码进行了编辑 ,这些代码显示了在执行阻塞等待时,托pipe的STA线程不会被泵送。 该代码是一个完整的控制台应用程序准备复制/粘贴/运行:

 using System; using System.Collections.Concurrent; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; namespace ConsoleTestApp { class Program { // start and run an STA thread static void RunStaThread(bool pump) { // test a blocking wait with BlockingCollection.Take var tasks = new BlockingCollection<Task>(); var thread = new Thread(() => { // Create a simple Win32 window var hwndStatic = NativeMethods.CreateWindowEx(0, "Static", String.Empty, NativeMethods.WS_POPUP, 0, 0, 0, 0, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero); // subclass it with a custom WndProc IntPtr prevWndProc = IntPtr.Zero; var newWndProc = new NativeMethods.WndProc((hwnd, msg, wParam, lParam) => { if (msg == NativeMethods.WM_TEST) Console.WriteLine("WM_TEST processed"); return NativeMethods.CallWindowProc(prevWndProc, hwnd, msg, wParam, lParam); }); prevWndProc = NativeMethods.SetWindowLong(hwndStatic, NativeMethods.GWL_WNDPROC, newWndProc); if (prevWndProc == IntPtr.Zero) throw new ApplicationException(); // post a test WM_TEST message to it NativeMethods.PostMessage(hwndStatic, NativeMethods.WM_TEST, IntPtr.Zero, IntPtr.Zero); // BlockingCollection blocks without pumping, NativeMethods.WM_TEST never arrives try { var task = tasks.Take(); } catch (Exception e) { Console.WriteLine(e.Message); } if (pump) { // NativeMethods.WM_TEST will arrive, because Win32 MessageBox pumps Console.WriteLine("Now start pumping..."); NativeMethods.MessageBox(IntPtr.Zero, "Pumping messages, press OK to stop...", String.Empty, 0); } }); thread.SetApartmentState(ApartmentState.STA); thread.Start(); Thread.Sleep(2000); // this causes the STA thread to end tasks.CompleteAdding(); thread.Join(); } static void Main(string[] args) { Console.WriteLine("Testing without pumping..."); RunStaThread(false); Console.WriteLine("\nTest with pumping..."); RunStaThread(true); Console.WriteLine("Press Enter to exit"); Console.ReadLine(); } } // Interop static class NativeMethods { [DllImport("user32")] public static extern IntPtr SetWindowLong(IntPtr hwnd, int nIndex, WndProc newProc); [DllImport("user32")] public static extern IntPtr CallWindowProc(IntPtr lpPrevWndFunc, IntPtr hwnd, int msg, int wParam, int lParam); [DllImport("user32.dll")] public static extern IntPtr CreateWindowEx(int dwExStyle, string lpClassName, string lpWindowName, int dwStyle, int x, int y, int nWidth, int nHeight, IntPtr hWndParent, IntPtr hMenu, IntPtr hInstance, IntPtr lpParam); [DllImport("user32.dll")] public static extern bool PostMessage(IntPtr hwnd, uint msg, IntPtr wParam, IntPtr lParam); [DllImport("user32.dll")] public static extern int MessageBox(IntPtr hwnd, string text, String caption, int options); public delegate IntPtr WndProc(IntPtr hwnd, int msg, int wParam, int lParam); public const int GWL_WNDPROC = -4; public const int WS_POPUP = unchecked((int)0x80000000); public const int WM_USER = 0x0400; public const int WM_TEST = WM_USER + 1; } } 

这产生输出:

没有泵抽testing
收集参数是空的,并且在添加方面已经被标记为完整的。

testing与泵...
收集参数是空的,并且在添加方面已经被标记为完整的。
现在开始抽...
 WM_TEST处理
按Enter键退出

我对您的问题的理解:您正在使用StaTaskScheduler只为您的传统COM对象组织经典COM STA公寓。 您没有StaTaskScheduler的STA线程上运行WinForms或WPF核心消息循环。 也就是说,你并没有在该线程中使用像Application.RunApplication.DoEventsDispatcher.PushFrame类的东西。 纠正我,如果这是一个错误的假设。

就其本身而言, StaTaskScheduler 不会在其创build的STA线程上安装任何同步上下文。 因此,您依靠CLR为您提供信息。 我只发现了一个隐含的确认:CLR在STA线程上抽水,在 Chris和Brumme 的CLR中抽水:

我一直说,在STA线程上调用时,pipe理阻塞将执行“一些抽取”。 不知道究竟会得到什么? 不幸的是,抽水是一种超出凡人理解的黑色艺术。 在Win2000及之后,我们简单地委托给OLE32的CoWaitForMultipleHandles服务。

这表明CLR CoWaitForMultipleHandles内部为STA线程使用CoWaitForMultipleHandles 。 此外, COWAIT_DISPATCH_WINDOW_MESSAGES标志的MSDN文档提到了这一点 :

…在STA中仅仅是一小组特殊情况下发送的消息。

我做了一些研究 ,但无法从WM_TEST抽取示例代码中的CoWaitForMultipleHandles ,我们在对您的问题的评论中讨论过。 我的理解是,前面提到的一小组特殊的消息 实际上仅限于某些COM编组特定的消息,并不包含像WM_TEST这样的常规通用消息。

所以,要回答你的问题:

…我应该实现一个自定义的同步上下文,它将明确地泵送消息与CoWaitForMultipleHandles,并将其安装在由StaTaskScheduler启动的每个STA线程?

是的,我相信创build一个自定义同步上下文并重写SynchronizationContext.Wait确实是正确的解决scheme。

但是,您应该避免使用CoWaitForMultipleHandles而是使用MsgWaitForMultipleObjectsEx 。 如果MsgWaitForMultipleObjectsEx指示队列中有未决消息,则应使用PeekMessage(PM_REMOVE)DispatchMessage手动抽取消息。 那么你应该继续等待这个句柄,都在同一个SynchronizationContext.Wait调用里面。

请注意, MsgWaitForMultipleObjectsExMsgWaitForMultipleObjects之间存在细微但重要的区别 。 如果在队列中已经看到消息(例如,使用PeekMessage(PM_NOREMOVE)GetQueueStatus ),则后者不会返回并保持阻塞,但是不会被移除。 这对于抽取是不好的,因为你的COM对象可能使用类似PeekMessage东西来检查消息队列。 这可能以后会导致MsgWaitForMultipleObjects阻止时,不预期。

OTOH,具有MWMO_INPUTAVAILABLE标志的MsgWaitForMultipleObjectsEx没有这样的缺点,并且在这种情况下将返回。

前一段时间,我创build了一个定制版本的StaTaskScheduler ( 在这里以ThreadAffinityTaskScheduler ),试图解决一个不同的问题 :维护一个具有线程相关性的线程池,以便随后await延续。 如果跨多个awaits使用STA COM对象,则线程关系是至关重要的 。 原始的StaTaskScheduler仅在其池限制为1个线程时StaTaskScheduler出现此行为。

所以我继续做了一些WM_TEST实验。 最初,我在STA线程上安装了一个标准的SynchronizationContext类的实例。 WM_TEST消息没有被抽出,这是预期的。

然后我重写SynchronizationContext.Wait只是将其转发到SynchronizationContext.WaitHelper 。 它确实被叫了,但仍然没有泵。

最后,我实现了一个全function的消息泵循环,这是它的核心部分:

 // the core loop var msg = new NativeMethods.MSG(); while (true) { // MsgWaitForMultipleObjectsEx with MWMO_INPUTAVAILABLE returns, // even if there's a message already seen but not removed in the message queue nativeResult = NativeMethods.MsgWaitForMultipleObjectsEx( count, waitHandles, (uint)remainingTimeout, QS_MASK, NativeMethods.MWMO_INPUTAVAILABLE); if (IsNativeWaitSuccessful(count, nativeResult, out managedResult) || WaitHandle.WaitTimeout == managedResult) return managedResult; // there is a message, pump and dispatch it if (NativeMethods.PeekMessage(out msg, IntPtr.Zero, 0, 0, NativeMethods.PM_REMOVE)) { NativeMethods.TranslateMessage(ref msg); NativeMethods.DispatchMessage(ref msg); } if (hasTimedOut()) return WaitHandle.WaitTimeout; } 

这是行不通的, WM_TEST被抽了出来。 以下是您testing的改编版本:

 public static async Task RunAsync() { using (var staThread = new Noseratio.ThreadAffinity.ThreadWithAffinityContext(staThread: true, pumpMessages: true)) { Console.WriteLine("Initial thread #" + Thread.CurrentThread.ManagedThreadId); await staThread.Run(async () => { Console.WriteLine("On STA thread #" + Thread.CurrentThread.ManagedThreadId); // create a simple Win32 window IntPtr hwnd = CreateTestWindow(); // Post some WM_TEST messages Console.WriteLine("Post some WM_TEST messages..."); NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(1), IntPtr.Zero); NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(2), IntPtr.Zero); NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(3), IntPtr.Zero); Console.WriteLine("Press Enter to continue..."); await ReadLineAsync(); Console.WriteLine("After await, thread #" + Thread.CurrentThread.ManagedThreadId); Console.WriteLine("Pending messages in the queue: " + (NativeMethods.GetQueueStatus(0x1FF) >> 16 != 0)); Console.WriteLine("Exiting STA thread #" + Thread.CurrentThread.ManagedThreadId); }, CancellationToken.None); } Console.WriteLine("Current thread #" + Thread.CurrentThread.ManagedThreadId); } 

输出

初始线程#9
在STA线程#10上
发布一些WM_TEST消息...
按Enter继续...
 WM_TEST处理:1
 WM_TEST处理:2
 WM_TEST处理:3

等待后,线程#10
等待队列中的消息:False
正在退出STA线程#10
当前线程#12
按任何一个键退出 

注意,这个实现支持线程关联(在await之后它保持在线程#10)和消息抽取。 完整的源代码包含可重用的部分( ThreadAffinityTaskSchedulerThreadWithAffinityContext ),并可在此处作为自包含的控制台应用程序使用 。 它没有经过彻底的testing,所以使用它需要您自担风险。

STA线程抽取的主题是很大的一个,很less有程序员有一个愉快的时间解决死锁。 关于它的开创性论文是由Chris Brumme撰写的。 你会在这个博客文章中find它。 不幸的是,在具体细节方面还是比较短缺的,他并没有超越,注意到CLR只是抽了一点 ,却没有详细的规定。

他在.NET 2.0中join的代码出现在名为MsgWaitHelper()的内部CLR函数中。 .NET 2.0的源代码可以通过SSCLI20发行版获得。 非常完整,但不包括MsgWaitHelper()的源代码。 很不寻常。 反编译是一个失败的原因,它是非常大的。

从他的博客文章中拿走的一件事是再次入侵的危险。 当STA程序处于不正确的状态以允许这样的代码执行时,在STA线程中进行泵送对于调度Windows消息的能力是危险的,并且获取任意代码来执行。 VB6程序员知道什么时候使用DoEvents()来获取代码中的模态循环来停止冻结UI。 我写了一篇关于最典型危险的文章。 MsgWaitHelper()完成这种抽取操作,然而它对于允许运行的代码是非常有select性的。

您可以通过运行程序而不需要附加debugging程序,然后附加非托pipedebugging程序,来了解它在testing程序中的作用。 你会看到它在NtWaitForMultipleObjects()上被阻塞。 我更进了一步,在PeekMessageW()上设置一个断点来获取这个堆栈跟踪:

 user32.dll!PeekMessageW() Unknown combase.dll!CCliModalLoop::MyPeekMessage(tagMSG * pMsg, HWND__ * hwnd, unsigned int min, unsigned int max, unsigned short wFlag) Line 2305 C++ combase.dll!CCliModalLoop::PeekRPCAndDDEMessage() Line 2008 C++ combase.dll!CCliModalLoop::FindMessage(unsigned long dwStatus) Line 2087 C++ combase.dll!CCliModalLoop::HandleWakeForMsg() Line 1707 C++ combase.dll!CCliModalLoop::BlockFn(void * * ahEvent, unsigned long cEvents, unsigned long * lpdwSignaled) Line 1645 C++ combase.dll!ClassicSTAThreadWaitForHandles(unsigned long dwFlags, unsigned long dwTimeout, unsigned long cHandles, void * * pHandles, unsigned long * pdwIndex) Line 46 C++ combase.dll!CoWaitForMultipleHandles(unsigned long dwFlags, unsigned long dwTimeout, unsigned long cHandles, void * * pHandles, unsigned long * lpdwindex) Line 120 C++ clr.dll!MsgWaitHelper(int,void * *,int,unsigned long,int) Unknown clr.dll!Thread::DoAppropriateWaitWorker(int,void * *,int,unsigned long,enum WaitMode) Unknown clr.dll!Thread::DoAppropriateWait(int,void * *,int,unsigned long,enum WaitMode,struct PendingSync *) Unknown clr.dll!CLREventBase::WaitEx(unsigned long,enum WaitMode,struct PendingSync *) Unknown clr.dll!CLREventBase::Wait(unsigned long,int,struct PendingSync *) Unknown clr.dll!Thread::Block(int,struct PendingSync *) Unknown clr.dll!SyncBlock::Wait(int,int) Unknown clr.dll!ObjectNative::WaitTimeout(bool,int,class Object *) Unknown 

注意我在Windows 8.1上logging了这个堆栈跟踪,在旧的Windows版本上看起来会有很大的不同。 在Windows 8中,COM模式循环已经被大量修改,这对于WinRT程序来说也是非常重要的。 不知道那么多,但似乎有另一个STA线程模型,名为ASTA,在更多的CoWaitForMultipleObjects()中提供了一个更严格的抽水,

ObjectNative :: WaitTimeout()是BlockingCollection.Take()方法内的SemaphoreSlim.Wait()开始执行CLR代码的地方。 你会看到它通过内部CLR代码级别来到神秘的MsgWaitHelper()函数,然后切换到臭名昭着的COM模式调度器循环。

它在你的程序中做“错误的”types的蝙蝠信号标志是调用CliModalLoop :: PeekRPCAndDDEMessage()方法。 换句话说, 它只考虑发送到特定内部窗口的interop消息的types,该窗口派发跨越公寓边界的COM调用。 它不会为您自己的窗口泵送消息队列中的消息。

这是可以理解的行为,Windows只能够绝对确保重新进入不会在您的程序看到您的UI线程空闲时终止程序。 当它抽取消息循环本身时,它是空闲的,调用PeekMessage()或GetMessage()指示该状态。 问题是,你不能抽空自己。 您违反了STA线程的核心协议,它必须抽取消息循环。 希望COM模式的循环能为你抽水是一种闲置的希望。

你可以真的解决这个问题,即使我不build议你这样做。 CLR将把它留给应用程序本身来执行一个正确构造的SynchronizationContext.Current对象的等待。 您可以通过派生自己的类并重写Wait()方法来创build一个类。 调用SetWaitNotificationRequired()方法来说服CLR应该保留给你。 一个不完整的版本,演示的方法:

 class MySynchronizationProvider : System.Threading.SynchronizationContext { public MySynchronizationProvider() { base.SetWaitNotificationRequired(); } public override int Wait(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout) { for (; ; ) { int result = MsgWaitForMultipleObjects(waitHandles.Length, waitHandles, waitAll, millisecondsTimeout, 8); if (result == waitHandles.Length) System.Windows.Forms.Application.DoEvents(); else return result; } } [DllImport("user32.dll")] private static extern int MsgWaitForMultipleObjects(int cnt, IntPtr[] waitHandles, bool waitAll, int millisecondTimeout, int mask); } 

并在你的线程开始时安装它:

  System.ComponentModel.AsyncOperationManager.SynchronizationContext = new MySynchronizationProvider(); 

您现在将看到您的WM_TEST消息正在分派。 它是调用它的Application.DoEvents()的调用。 我可以通过使用PeekMessage + DispatchMessage来覆盖它,但会混淆此代码的危险,最好不要将DoEvents()放在表下。 你真的在这里玩一个非常危险的重新游戏。 不要使用这个代码。

长话短说,正确使用StaThreadScheduler的唯一希望是在已经实现了STA合同的代码中使用StaThreadScheduler,并且像STA线程那样应该执行泵。 这真的意味着作为一个老代码的创可贴,你不必奢华地控制线程状态。 就像在VB6程序或Office加载项中开始生活的任何代码一样。 尝试了一下,我不认为它实际上可以工作。 值得注意的是,它的需要应该完全消除与asych / await的可用性。