unit MyBufferFillerThread;
interface
uses
Windows, Classes, SyncObjs, Generics.Collections;
const
cMyBufferFillerThreadDefaultBufferSize = $400000;
//1 shl 22; // 4 MiB
cMyBufferFillerThreadDefaultBlockSize = $8000;
//1 shl 15; // 32 KiB
type
TMyBufferFillerThread =
class(TThread)
private
type
TMyBlockItem =
record
Data : pointer;
Size : integer;
LastBlock : boolean;
end;
TMyBlockItemQueue = TQueue<TMyBlockItem>;
private
var
FBlockSize : integer;
// block size
FBufferSize : integer;
// buffer length, must be > FBlockSize
FBuffer : Pointer;
// buffer pointer
FBlocksCount : integer;
FStream : TStream;
// stream to read from
FOwnsStream : boolean;
// owns stream
FReadQueue : TMyBlockItemQueue;
// ready to read data
FWriteQueue : TMyBlockItemQueue;
// free blocks
FCrtSec : TCriticalSection;
FPosition : int64;
FSize : int64;
FEndOfFile : boolean;
// will be true if the last block was retrieved with Read
procedure BlockLoaded (
var Item: TMyBlockItem);
procedure BlockRead (
var Item: TMyBlockItem);
function CanReadBlock: boolean;
function CanWriteBlock: boolean;
protected
procedure Execute;
override;
procedure SetThreadParameters(
const Stream: TStream;
const OwnsStream: boolean;
const BufferSize: integer;
const BlockSize: integer);
public
class function CreateBufferFillerThread(
const Stream: TStream;
const OwnsStream: boolean = True;
const BufferSize: integer = cMyBufferFillerThreadDefaultBufferSize;
const BlockSize: integer = cMyBufferFillerThreadDefaultBlockSize): TMyBufferFillerThread;
destructor Destroy;
override;
function Read(
var Buffer): integer;
// reads a block (FBlockSize) to your buffer, returns actual size like TStream
property Position : int64
read FPosition;
property Size : int64
read FSize;
property EndOfFile : boolean
read FEndOfFile;
end;
implementation
{ TMyBufferFillerThread }
class function TMyBufferFillerThread.CreateBufferFillerThread(
const Stream: TStream;
const OwnsStream: boolean;
const BufferSize: integer;
const BlockSize: integer): TMyBufferFillerThread;
begin
result:= TMyBufferFillerThread.Create(true);
with result
do
begin
FreeOnTerminate := false;
FBuffer :=
nil;
FCrtSec := TCriticalSection.Create;
FReadQueue := TMyBlockItemQueue.Create;
FWriteQueue := TMyBlockItemQueue.Create;
SetThreadParameters(Stream, OwnsStream, BufferSize, BlockSize);
end;
end;
procedure TMyBufferFillerThread.SetThreadParameters(
const Stream: TStream;
const OwnsStream: boolean;
const BufferSize: integer;
const BlockSize: integer);
var i: integer; itm: TMyBlockItem;
begin
if FBuffer <>
nil then
FreeMemory(FBuffer);
FBlockSize := BlockSize;
FBufferSize := BufferSize
div FBlockSize * FBlockSize + FBlockSize;
// needs to be a multiple of BlockSize
FBuffer := GetMemory(FBufferSize);
FBlocksCount := FBufferSize
div FBlockSize;
// max blocks we can cache
FStream := Stream;
FOwnsStream := OwnsStream;
FPosition := FStream.Position;
FSize := FStream.Size;
FEndOfFile := false;
FReadQueue.Clear;
FWriteQueue.Clear;
for i := 0
to FBlocksCount - 1
do
begin
itm.Data := Pointer( integer(FBuffer) + (i * FBlockSize) );
itm.Size := 0;
itm.LastBlock := False;
FWriteQueue.Enqueue(itm);
end;
end;
destructor TMyBufferFillerThread.Destroy;
begin
if FBuffer <>
nil then
FreeMemory(FBuffer);
if FOwnsStream
then
FStream.Free;
FCrtSec.Free;
FReadQueue.Free;
FWriteQueue.Free;
inherited;
end;
procedure TMyBufferFillerThread.BlockLoaded(
var Item: TMyBlockItem);
begin
FCrtSec.Enter;
FReadQueue.Enqueue(Item);
FCrtSec.Leave;
end;
procedure TMyBufferFillerThread.BlockRead(
var Item: TMyBlockItem);
begin
FCrtSec.Enter;
FWriteQueue.Enqueue(Item);
FCrtSec.Leave;
if Suspended
then
Suspended:= false;
end;
function TMyBufferFillerThread.CanReadBlock;
begin
FCrtSec.Enter;
result:= FReadQueue.Count > 0;
FCrtSec.Leave;
end;
function TMyBufferFillerThread.CanWriteBlock;
begin
FCrtSec.Enter;
result:= FWriteQueue.Count > 0;
FCrtSec.Leave;
end;
function TMyBufferFillerThread.
Read(
var Buffer): integer;
var itm: TMyBlockItem;
begin
if FEndOfFile
then // nothing left
begin
result:= 0;
exit;
end;
while not CanReadBlock
do
Sleep(5);
FCrtSec.Enter;
itm := FReadQueue.Dequeue;
FCrtSec.Leave;
result := itm.Size;
FEndOfFile:= itm.LastBlock;
CopyMemory(@Buffer, itm.Data, result);
BlockRead(itm);
Inc(FPosition, itm.Size);
end;
procedure TMyBufferFillerThread.Execute;
var itm: TMyBlockItem; s, p: int64;
begin
{ Place thread code here }
s:= FSize;
p:= FPosition;
while (p < s)
and (
not Terminated)
do
begin
if (
not CanWriteBlock)
and (
not Terminated)
then
Suspended:= true;
if Terminated
then
exit;
FCrtSec.Enter;
itm:= FWriteQueue.Dequeue;
FCrtSec.Leave;
itm.Size := FStream.
Read(itm.Data^, FBlockSize);
Inc(p, itm.Size);
itm.LastBlock := s = p;
BlockLoaded(itm);
end;
end;
end.