unit Unit1;
interface
uses
System.SysUtils,
System.Classes,
System.SyncObjs,
System.Generics.Collections;
type
TMessageQueueThread<T> =
class( TThread )
private
FQueueProc: TProc<T>;
FInQueue, FOutQueue: TQueue<T>;
FEvent: TEvent;
protected
procedure Execute;
override;
procedure DoProcessQueue( AQueue: TQueue<T> );
procedure TerminatedSet;
override;
public
constructor Create( AQueueProc: TProc<T> );
destructor Destroy;
override;
procedure Enqueue(
const AItem: T );
end;
TReceiveThread =
class( TThread )
private
FMessageQueue: TMessageQueueThread<
string>;
FFMTset: TFormatSettings;
protected
procedure Execute;
override;
public
constructor Create( AProc: TProc<
string> );
destructor Destroy;
override;
end;
implementation
{ TRelayQueueThread<T> }
constructor TMessageQueueThread<T>.Create( AQueueProc: TProc<T> );
begin
inherited Create( False );
FEvent := TEvent.Create(
nil, False, False, '
' );
FQueueProc := AQueueProc;
FInQueue := TQueue<T>.Create;
FOutQueue := TQueue<T>.Create;
end;
destructor TMessageQueueThread<T>.Destroy;
begin
inherited;
FInQueue.Free;
FOutQueue.Free;
FEvent.Free;
end;
procedure TMessageQueueThread<T>.DoProcessQueue( AQueue: TQueue<T> );
begin
while AQueue.Count > 0
do
begin
FQueueProc( AQueue.Peek );
AQueue.Dequeue;
end;
end;
procedure TMessageQueueThread<T>.Enqueue(
const AItem: T );
begin
FInQueue.Enqueue( AItem );
FEvent.SetEvent;
end;
procedure TMessageQueueThread<T>.Execute;
begin
inherited;
while not Terminated
do
begin
FEvent.WaitFor( );
if Terminated
then
Exit;
// Queues tauschen
FOutQueue := TInterlocked.Exchange < TQueue < T >> ( FInQueue, FOutQueue );
// Queue verarbeiten
DoProcessQueue( FOutQueue );
// Queue leeren, nur für alle Fälle, sollte ja eh jetzt leer sein :o)
FOutQueue.Clear;
end;
end;
procedure TMessageQueueThread<T>.TerminatedSet;
begin
inherited;
FEvent.SetEvent;
end;
{ TReceiveThread }
constructor TReceiveThread.Create( AProc: TProc<
string> );
begin
inherited Create( False );
FFMTset := TFormatSettings.Create( '
' );
FMessageQueue := TMessageQueueThread<
string>.Create( AProc );
end;
destructor TReceiveThread.Destroy;
begin
inherited;
FMessageQueue.Free;
end;
procedure TReceiveThread.Execute;
var
LMsg:
string;
begin
inherited;
while not Terminated
do
begin
LMsg := FormatDateTime( '
hh:nn:ss.zzz', Now, FFMTset );
FMessageQueue.Enqueue( LMsg );
end;
end;
end.