Essayez d'utiliser le TThreadedQueue (Génériques.Collections) dans un seul producteur de plusieurs consommateurs régime. (Delphi XE). L'idée est de pousser des objets dans une file d'attente et de laisser plusieurs threads de travail vidange de la file d'attente.
Il ne fonctionne pas comme prévu, si. Lorsque deux ou plusieurs threads sont appel PopItem, des violations d'accès sont jetés par les TThreadedQueue.
Si l'appel à PopItem est sérialisé avec une section critique, tout est très bien.
Sûrement le TThreadedQueue doit être capable de gérer plusieurs consommateurs, alors j'ai loupé quelque chose ou est-ce un pur bug dans TThreadedQueue ?
Voici un exemple simple pour produire de l'erreur.
program TestThreadedQueue;
{$APPTYPE CONSOLE}
uses
// FastMM4 in '..\..\..\FastMM4\FastMM4.pas',
Windows,
Messages,
Classes,
SysUtils,
SyncObjs,
Generics.Collections;
type TThreadTaskMsg =
class(TObject)
private
threadID : integer;
threadMsg : string;
public
Constructor Create( ID : integer; const msg : string);
end;
type TThreadReader =
class(TThread)
private
fPopQueue : TThreadedQueue<TObject>;
fSync : TCriticalSection;
fMsg : TThreadTaskMsg;
fException : Exception;
procedure DoSync;
procedure DoHandleException;
public
Constructor Create( popQueue : TThreadedQueue<TObject>;
sync : TCriticalSection);
procedure Execute; override;
end;
Constructor TThreadReader.Create( popQueue : TThreadedQueue<TObject>;
sync : TCriticalSection);
begin
fPopQueue:= popQueue;
fMsg:= nil;
fSync:= sync;
Self.FreeOnTerminate:= FALSE;
fException:= nil;
Inherited Create( FALSE);
end;
procedure TThreadReader.DoSync ;
begin
WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId));
end;
procedure TThreadReader.DoHandleException;
begin
WriteLn('Exception ->' + fException.Message);
end;
procedure TThreadReader.Execute;
var signal : TWaitResult;
begin
NameThreadForDebugging('QueuePop worker');
while not Terminated do
begin
try
{- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. }
Sleep(20);
{- Serializing calls to PopItem works }
if Assigned(fSync) then fSync.Enter;
try
signal:= fPopQueue.PopItem( TObject(fMsg));
finally
if Assigned(fSync) then fSync.Release;
end;
if (signal = wrSignaled) then
begin
try
if Assigned(fMsg) then
begin
fMsg.threadMsg:= '<Thread id :' +IntToStr( Self.threadId) + '>';
fMsg.Free; // We are just dumping the message in this test
//Synchronize( Self.DoSync);
//PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0);
end;
except
on E:Exception do begin
end;
end;
end;
except
FException:= Exception(ExceptObject);
try
if not (FException is EAbort) then
begin
{Synchronize(} DoHandleException; //);
end;
finally
FException:= nil;
end;
end;
end;
end;
Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string);
begin
Inherited Create;
threadID:= ID;
threadMsg:= msg;
end;
var
fSync : TCriticalSection;
fThreadQueue : TThreadedQueue<TObject>;
fReaderArr : array[1..4] of TThreadReader;
i : integer;
begin
try
IsMultiThread:= TRUE;
fSync:= TCriticalSection.Create;
fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100);
try
{- Calling without fSync throws exceptions when two or more threads calls PopItem
at the same time }
WriteLn('Creating worker threads ...');
for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil);
{- Calling with fSync works ! }
//for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync);
WriteLn('Init done. Pushing items ...');
for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));
ReadLn;
finally
for i:= 1 to 4 do fReaderArr[i].Free;
fThreadQueue.Free;
fSync.Free;
end;
except
on E: Exception do
begin
Writeln(E.ClassName, ': ', E.Message);
ReadLn;
end;
end;
end.
Mise à jour : L'erreur dans TMonitor qui a causé TThreadedQueue panne est résolu dans Delphi XE2.
Mise à jour 2 : Le test ci-dessus a souligné la file d'attente dans le vide. Darian Miller trouvé que le fait de souligner la file d'attente à pleine état, encore capable de reproduire l'erreur dans XE2. L'erreur est de nouveau dans le TMonitor. Voir sa réponse ci-dessous pour plus d'informations. Et aussi un lien vers le QC101114.
Mise à jour 3:
Avec Delphi XE2 mise à jour 4, il était annoncé pour fixer TMonitor
qui permettrait de guérir les problèmes en TThreadedQueue
. Mes tests ne sont pas en mesure de reproduire les erreurs en TThreadedQueue
plus.
Testé seul producteur/plusieurs threads consommateurs lors de la file d'attente est vide et le plein.
Également testé plusieurs producteurs et plusieurs consommateurs. J'ai varié les threads lecteur et écrivain, fils de 1 à 100, sans aucun problème. Mais connaissant l'histoire, je n'ose les autres à briser TMonitor
.