TThreadedQueue没有能力的多个消费者?

尝试在单个生产者多个消费者scheme中使用TThreadedQueue(Generics.Collections)。 (DELPHI-XE)。 这个想法是推入对象到一个队列,让几个工作线程排空队列。

尽pipe如此,它没有像预期的那样工作。 当两个或两个以上工作线程调用PopItem时,访问冲突从TThreadedQueue抛出。

如果对PopItem的调用是使用临界区域进行序列化的,则一切正常。

当然TThreadedQueue应该能够处理多个消费者,所以我错过了什么,或者这是TThreadedQueue纯粹的错误?

这是一个简单的例子来产生错误。

program TestThreadedQueue; {$APPTYPE CONSOLE} uses // FastMM4 in '..\..\..\FastMM4\FastMM4.pas', Windows, Messages, Classes, SysUtils, SyncObjs, Generics.Collections; type TThreadTaskMsg = class(TObject) private threadID : integer; threadMsg : string; public Constructor Create( ID : integer; const msg : string); end; type TThreadReader = class(TThread) private fPopQueue : TThreadedQueue<TObject>; fSync : TCriticalSection; fMsg : TThreadTaskMsg; fException : Exception; procedure DoSync; procedure DoHandleException; public Constructor Create( popQueue : TThreadedQueue<TObject>; sync : TCriticalSection); procedure Execute; override; end; Constructor TThreadReader.Create( popQueue : TThreadedQueue<TObject>; sync : TCriticalSection); begin fPopQueue:= popQueue; fMsg:= nil; fSync:= sync; Self.FreeOnTerminate:= FALSE; fException:= nil; Inherited Create( FALSE); end; procedure TThreadReader.DoSync ; begin WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId)); end; procedure TThreadReader.DoHandleException; begin WriteLn('Exception ->' + fException.Message); end; procedure TThreadReader.Execute; var signal : TWaitResult; begin NameThreadForDebugging('QueuePop worker'); while not Terminated do begin try {- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. } Sleep(20); {- Serializing calls to PopItem works } if Assigned(fSync) then fSync.Enter; try signal:= fPopQueue.PopItem( TObject(fMsg)); finally if Assigned(fSync) then fSync.Release; end; if (signal = wrSignaled) then begin try if Assigned(fMsg) then begin fMsg.threadMsg:= '<Thread id :' +IntToStr( Self.threadId) + '>'; fMsg.Free; // We are just dumping the message in this test //Synchronize( Self.DoSync); //PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0); end; except on E:Exception do begin end; end; end; except FException:= Exception(ExceptObject); try if not (FException is EAbort) then begin {Synchronize(} DoHandleException; //); end; finally FException:= nil; end; end; end; end; Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string); begin Inherited Create; threadID:= ID; threadMsg:= msg; end; var fSync : TCriticalSection; fThreadQueue : TThreadedQueue<TObject>; fReaderArr : array[1..4] of TThreadReader; i : integer; begin try IsMultiThread:= TRUE; fSync:= TCriticalSection.Create; fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100); try {- Calling without fSync throws exceptions when two or more threads calls PopItem at the same time } WriteLn('Creating worker threads ...'); for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil); {- Calling with fSync works ! } //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync); WriteLn('Init done. Pushing items ...'); for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,'')); ReadLn; finally for i:= 1 to 4 do fReaderArr[i].Free; fThreadQueue.Free; fSync.Free; end; except on E: Exception do begin Writeln(E.ClassName, ': ', E.Message); ReadLn; end; end; end. 

更新 :导致TThreadedQueue崩溃的TMonitor中的错误在Delphi XE2中得到了修复。

更新2 :上面的testing强调了在空状态下的队列。 Darian Miller发现强调队列处于满状态,仍然可能再现XE2中的错误。 该错误再次在TMonitor中。 请参阅下面的答案以获取更多信息。 也是QC101114的链接。

更新3 :随着delphiXE2更新4有一个TMonitor解决TMonitor ,可以解决TMonitor中的问题。 到目前为止,我的testing不能再现TThreadedQueue任何错误。 testing单个生产者/多个消费者线程时,队列是空的和满的。 还testing了多个生产者/多个消费者。 我将阅读器线程和写入器线程从1改为100,没有任何故障。 但是了解历史,我敢打破别人打破TMonitor

那么,很难确定没有很多testing,但肯定看起来这是一个错误,无论是在TThreadedQueue还是在TMonitor。 无论哪种方式,它在RTL中,而不是你的代码。 你应该把这个文件作为QC报告,并用上面的例子作为“如何重现”代码。

你的例子似乎在XE2下正常工作,但是如果我们填充你的队列,它会失败与一个PushItem AV。 (在XE2 Update1下testing)

要重现,只需将您的任务创build从100增加到1100(您的队列深度设置为1024)

 for i:= 1 to 1100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,'')); 

这在我每次在Windows 7上都会死掉。我最初尝试着不断地推动它进行压力testing,然后在循环30失败…然后在循环16 …然后在65以不同的时间间隔,但是在一些时候一直失败点。

  iLoop := 0; while iLoop < 1000 do begin Inc(iLoop); WriteLn('Loop: ' + IntToStr(iLoop)); for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,'')); end; 

我寻找TThreadedQueue类,但似乎没有在我的D2009。 我不完全会自杀这个 – Delphi线程支持一直是错误的.. errm …“非最佳”,我怀疑TThreadedQueue是没有不同:)

为什么使用PC(生产者/消费者)对象的generics? 一个简单的TObjectQueue后代将会很好地使用这个数十年 – 可以和多个生产者/消费者一起工作:

 unit MinimalSemaphorePCqueue; { Absolutely minimal PC queue based on TobjectQueue and a semaphore. The semaphore count reflects the queue count 'push' will always succeed unless memory runs out, then you're stuft anyway. 'pop' has a timeout parameter as well as the address of where any received object is to be put. 'pop' returns immediately with 'true' if there is an object on the queue available for it. 'pop' blocks the caller if the queue is empty and the timeout is not 0. 'pop' returns false if the timeout is exceeded before an object is available from the queue. 'pop' returns true if an object is available from the queue before the timeout is exceeded. If multiple threads have called 'pop' and are blocked because the queue is empty, a single 'push' will make only one of the waiting threads ready. Methods to push/pop from the queue A 'semaHandle' property that can be used in a 'waitForMultipleObjects' call. When the handle is signaled, the 'peek' method will retrieve the queued object. } interface uses Windows, Messages, SysUtils, Classes,syncObjs,contnrs; type pObject=^Tobject; TsemaphoreMailbox=class(TobjectQueue) private countSema:Thandle; protected access:TcriticalSection; public property semaHandle:Thandle read countSema; constructor create; virtual; procedure push(aObject:Tobject); virtual; function pop(pResObject:pObject;timeout:DWORD):boolean; virtual; function peek(pResObject:pObject):boolean; virtual; destructor destroy; override; end; implementation { TsemaphoreMailbox } constructor TsemaphoreMailbox.create; begin {$IFDEF D2009} inherited Create; {$ELSE} inherited create; {$ENDIF} access:=TcriticalSection.create; countSema:=createSemaphore(nil,0,maxInt,nil); end; destructor TsemaphoreMailbox.destroy; begin access.free; closeHandle(countSema); inherited; end; function TsemaphoreMailbox.pop(pResObject: pObject; timeout: DWORD): boolean; // dequeues an object, if one is available on the queue. If the queue is empty, // the caller is blocked until either an object is pushed on or the timeout // period expires begin // wait for a unit from the semaphore result:=(WAIT_OBJECT_0=waitForSingleObject(countSema,timeout)); if result then // if a unit was supplied before the timeout, begin access.acquire; try pResObject^:=inherited pop; // get an object from the queue finally access.release; end; end; end; procedure TsemaphoreMailbox.push(aObject: Tobject); // pushes an object onto the queue. If threads are waiting in a 'pop' call, // one of them is made ready. begin access.acquire; try inherited push(aObject); // shove the object onto the queue finally access.release; end; releaseSemaphore(countSema,1,nil); // release one unit to semaphore end; function TsemaphoreMailbox.peek(pResObject: pObject): boolean; begin access.acquire; try result:=(count>0); if result then pResObject^:=inherited pop; // get an object from the queue finally access.release; end; end; end. 

我不认为TThreadedQueue应该支持多个消费者。 这是一个FIFO,按照帮助文件。 我的印象是有一个线程推动,另一个(只有一个!)popup。