43 votes

TThreadedQueue pas capable de plusieurs consommateurs?

Essayez d'utiliser le TThreadedQueue (Génériques.Collections) dans un seul producteur de plusieurs consommateurs régime. (Delphi XE). L'idée est de pousser des objets dans une file d'attente et de laisser plusieurs threads de travail vidange de la file d'attente.

Il ne fonctionne pas comme prévu, si. Lorsque deux ou plusieurs threads sont appel PopItem, des violations d'accès sont jetés par les TThreadedQueue.

Si l'appel à PopItem est sérialisé avec une section critique, tout est très bien.

Sûrement le TThreadedQueue doit être capable de gérer plusieurs consommateurs, alors j'ai loupé quelque chose ou est-ce un pur bug dans TThreadedQueue ?

Voici un exemple simple pour produire de l'erreur.

program TestThreadedQueue;

{$APPTYPE CONSOLE}

uses
//  FastMM4 in '..\..\..\FastMM4\FastMM4.pas',
  Windows,
  Messages,
  Classes,
  SysUtils,
  SyncObjs,
  Generics.Collections;

type TThreadTaskMsg =
       class(TObject)
         private
           threadID  : integer;
           threadMsg : string;
         public
           Constructor Create( ID : integer; const msg : string);
       end;

type TThreadReader =
       class(TThread)
         private
           fPopQueue   : TThreadedQueue<TObject>;
           fSync       : TCriticalSection;
           fMsg        : TThreadTaskMsg;
           fException  : Exception;
           procedure DoSync;
           procedure DoHandleException;
         public
           Constructor Create( popQueue : TThreadedQueue<TObject>;
                               sync     : TCriticalSection);
           procedure Execute; override;
       end;

Constructor TThreadReader.Create( popQueue : TThreadedQueue<TObject>;
                                  sync     : TCriticalSection);
begin
  fPopQueue:=            popQueue;
  fMsg:=                 nil;
  fSync:=                sync;
  Self.FreeOnTerminate:= FALSE;
  fException:=           nil;

  Inherited Create( FALSE);
end;

procedure TThreadReader.DoSync ;
begin
  WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId));
end;

procedure TThreadReader.DoHandleException;
begin
  WriteLn('Exception ->' + fException.Message);
end;

procedure TThreadReader.Execute;
var signal : TWaitResult;
begin
  NameThreadForDebugging('QueuePop worker');
  while not Terminated do
  begin
    try
      {- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. }
      Sleep(20);
      {- Serializing calls to PopItem works }
      if Assigned(fSync) then fSync.Enter;
      try
        signal:= fPopQueue.PopItem( TObject(fMsg));
      finally
        if Assigned(fSync) then fSync.Release;
      end;
      if (signal = wrSignaled) then
      begin
        try
          if Assigned(fMsg) then
          begin
            fMsg.threadMsg:= '<Thread id :' +IntToStr( Self.threadId) + '>';
            fMsg.Free; // We are just dumping the message in this test
            //Synchronize( Self.DoSync);
            //PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0);
          end;
        except
          on E:Exception do begin
          end;
        end;
      end;
      except
       FException:= Exception(ExceptObject);
      try
        if not (FException is EAbort) then
        begin
          {Synchronize(} DoHandleException; //);
        end;
      finally
        FException:= nil;
      end;
   end;
  end;
end;

Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string);
begin
  Inherited Create;

  threadID:= ID;
  threadMsg:= msg;
end;

var
    fSync : TCriticalSection;
    fThreadQueue : TThreadedQueue<TObject>;
    fReaderArr : array[1..4] of TThreadReader;
    i : integer;

begin
  try
    IsMultiThread:= TRUE;

    fSync:=        TCriticalSection.Create;
    fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100);
    try
      {- Calling without fSync throws exceptions when two or more threads calls PopItem
         at the same time }
      WriteLn('Creating worker threads ...');
      for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil);
      {- Calling with fSync works ! }
      //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync);
       WriteLn('Init done. Pushing items ...');

      for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));

      ReadLn;

    finally
      for i:= 1 to 4 do fReaderArr[i].Free;
      fThreadQueue.Free;
      fSync.Free;
    end;

  except
    on E: Exception do
      begin
        Writeln(E.ClassName, ': ', E.Message);
        ReadLn;
      end;
  end;
end.

Mise à jour : L'erreur dans TMonitor qui a causé TThreadedQueue panne est résolu dans Delphi XE2.

Mise à jour 2 : Le test ci-dessus a souligné la file d'attente dans le vide. Darian Miller trouvé que le fait de souligner la file d'attente à pleine état, encore capable de reproduire l'erreur dans XE2. L'erreur est de nouveau dans le TMonitor. Voir sa réponse ci-dessous pour plus d'informations. Et aussi un lien vers le QC101114.

Mise à jour 3: Avec Delphi XE2 mise à jour 4, il était annoncé pour fixer TMonitor qui permettrait de guérir les problèmes en TThreadedQueue. Mes tests ne sont pas en mesure de reproduire les erreurs en TThreadedQueueplus. Testé seul producteur/plusieurs threads consommateurs lors de la file d'attente est vide et le plein. Également testé plusieurs producteurs et plusieurs consommateurs. J'ai varié les threads lecteur et écrivain, fils de 1 à 100, sans aucun problème. Mais connaissant l'histoire, je n'ose les autres à briser TMonitor.

19voto

Mason Wheeler Points 52022

Il est difficile d’être sûr sans beaucoup de tests, mais il semble bien qu’il s’agisse d’un bogue, que ce soit dans TThreadedQueue ou dans TMonitor. De toute façon, c'est dans la RTL et pas votre code. Vous devriez classer ceci en tant que rapport de CQ et utiliser votre exemple ci-dessus comme code "Comment reproduire".

10voto

RBA Points 5595

Je vous recommande d'utiliser OmniThreadLibrary http://www.thedelphigeek.com/search/label/OmniThreadLibrary lorsque vous travaillez avec des threads, du parallélisme, etc. Primoz a fait un très bon travail. Sur le site, vous trouverez une documentation utile. .

meilleures salutations,
Radu

4voto

Darian Miller Points 4915

Votre exemple semble bien fonctionner sous XE2, mais si nous remplir la file d'attente, il échoue avec l'AV sur une PushItem. (Testé sous XE2 Update1)

Pour se reproduire, il suffit d'augmenter votre tâche la création de 100 à 1100 (votre profondeur de file d'attente a été fixée à 1024)

for i:= 1 to 1100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));

C'est mort pour moi à chaque fois sur Windows 7. J'ai d'abord essayé des efforts continus de test de stress, et il a échoué à la boucle de 30...puis à boucle 16...puis, à 65 ans, de sorte à différents intervalles de temps, mais il a toujours manqué à un certain point.

  iLoop := 0;
  while iLoop < 1000 do
  begin
    Inc(iLoop);
    WriteLn('Loop: ' + IntToStr(iLoop));  
    for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));
  end;

3voto

Martin James Points 15655

J'ai cherché la classe TThreadedQueue mais ne semble pas l'avoir dans mon D2009. Je ne vais pas vraiment me tuer pour ça - le support des threads Delphi a toujours été err .. errm ... errm ... "non optimal" et je soupçonne que TThreadedQueue n'est pas différent :)

Pourquoi utiliser des génériques pour les objets PC (producteurs / consommateurs)? Un simple descendant de TObjectQueue fera l'affaire - l'utilise depuis des décennies - fonctionne bien avec plusieurs producteurs / consommateurs:

 unit MinimalSemaphorePCqueue;

{ Absolutely minimal P-C queue based on TobjectQueue and a semaphore.

The semaphore count reflects the queue count
'push' will always succeed unless memory runs out, then you're stuft anyway.
'pop' has a timeout parameter as well as the address of where any received
object is to be put.
'pop' returns immediately with 'true' if there is an object on the queue
available for it.
'pop' blocks the caller if the queue is empty and the timeout is not 0.
'pop' returns false if the timeout is exceeded before an object is available
from the queue.
'pop' returns true if an object is available from the queue before the timeout
is exceeded.
If multiple threads have called 'pop' and are blocked because the queue is
empty, a single 'push' will make only one of the waiting threads ready.


Methods to push/pop from the queue
A 'semaHandle' property that can be used in a 'waitForMultipleObjects' call.
When the handle is signaled, the 'peek' method will retrieve the queued object.
}
interface

uses
  Windows, Messages, SysUtils, Classes,syncObjs,contnrs;


type

pObject=^Tobject;


TsemaphoreMailbox=class(TobjectQueue)
private
  countSema:Thandle;
protected
  access:TcriticalSection;
public
  property semaHandle:Thandle read countSema;
  constructor create; virtual;
  procedure push(aObject:Tobject); virtual;
  function pop(pResObject:pObject;timeout:DWORD):boolean;  virtual;
  function peek(pResObject:pObject):boolean;  virtual;
  destructor destroy; override;
end;


implementation

{ TsemaphoreMailbox }

constructor TsemaphoreMailbox.create;
begin
{$IFDEF D2009}
   inherited Create;
{$ELSE}
  inherited create;
{$ENDIF}
  access:=TcriticalSection.create;
  countSema:=createSemaphore(nil,0,maxInt,nil);
end;

destructor TsemaphoreMailbox.destroy;
begin
  access.free;
  closeHandle(countSema);
  inherited;
end;

function TsemaphoreMailbox.pop(pResObject: pObject;
  timeout: DWORD): boolean;
// dequeues an object, if one is available on the queue.  If the queue is empty,
// the caller is blocked until either an object is pushed on or the timeout
// period expires
begin // wait for a unit from the semaphore
  result:=(WAIT_OBJECT_0=waitForSingleObject(countSema,timeout));
  if result then // if a unit was supplied before the timeout,
  begin
    access.acquire;
    try
      pResObject^:=inherited pop; // get an object from the queue
    finally
      access.release;
    end;
  end;
end;

procedure TsemaphoreMailbox.push(aObject: Tobject);
// pushes an object onto the queue.  If threads are waiting in a 'pop' call,
// one of them is made ready.
begin
  access.acquire;
  try
    inherited push(aObject); // shove the object onto the queue
  finally
    access.release;
  end;
  releaseSemaphore(countSema,1,nil); // release one unit to semaphore
end;

function TsemaphoreMailbox.peek(pResObject: pObject): boolean;
begin
  access.acquire;
  try
    result:=(count>0);
    if result then pResObject^:=inherited pop; // get an object from the queue
  finally
    access.release;
  end;
end;
end.
 

1voto

Giel Points 1500

Je ne pense pas que TThreadedQueue est supposé prendre en charge plusieurs consommateurs. C'est une FIFO, selon le fichier d'aide. J'ai l'impression qu'il y a un fil qui pousse et un autre (juste un!) Qui éclate.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X