unit Threading.ProcQueue;
interface
uses
System.Generics.Collections,
System.SysUtils,
System.Threading;
type
TProcQueue =
class
private
FShutdown : Boolean;
FMaxParallel: Integer;
FSync : TObject;
FProcQueue : TQueue<TProc>;
FTaskList : TList<ITask>;
procedure Execute(
const AProc: TProc );
procedure TaskHasFinished(
const ATask: ITask );
public
constructor Create(
const MaxParallel: Integer );
destructor Destroy;
override;
procedure Add(
const AProc: TProc );
end;
implementation
{ TProcQueue }
procedure TProcQueue.Add(
const AProc: TProc );
begin
if FShutdown
then
raise EInvalidOpException.Create( '
we are going down' );
TMonitor.Enter( FSync );
try
if FTaskList.Count < FMaxParallel
then
Execute( AProc )
else
FProcQueue.Enqueue( AProc );
finally
TMonitor.Exit( FSync );
end;
end;
constructor TProcQueue.Create(
const MaxParallel: Integer );
begin
inherited Create;
FMaxParallel := MaxParallel;
FSync := TObject.Create;
FProcQueue := TQueue<TProc>.Create;
FTaskList := TList<ITask>.Create;
end;
destructor TProcQueue.Destroy;
var
task: ITask;
begin
TMonitor.Enter( FSync );
try
FShutdown := True;
FProcQueue.Clear;
finally
TMonitor.Exit( FSync );
end;
try
TTask.WaitForAll( FTaskList.ToArray );
except
// we do not care about exceptions
end;
FTaskList.Free;
FProcQueue.Free;
inherited;
FSync.Free;
end;
procedure TProcQueue.Execute(
const AProc: TProc );
var
task: ITask;
begin
task := TTask.Create(
procedure
begin
try
AProc( );
finally
TaskHasFinished( task );
end;
end );
FTaskList.Add( task );
task.Start;
end;
procedure TProcQueue.TaskHasFinished(
const ATask: ITask );
begin
TMonitor.Enter( FSync );
try
FTaskList.Remove( ATask );
if not FShutdown
and ( FProcQueue.Count > 0 )
then
Execute( FProcQueue.Dequeue( ) );
finally
TMonitor.Exit( FSync );
end;
end;
end.