Einzelnen Beitrag anzeigen

AJ_Oldendorf

Registriert seit: 12. Jun 2009
440 Beiträge
 
Delphi 12 Athens
 
#56

AW: schnelle Server Client Verbindung ohne Verluste

  Alt 8. Apr 2025, 14:02
Danke Sebastian, guck mal drauf.

So sieht es jetzt aktuell aus, habe beim Client noch den Empfang eingebaut aber der läuft noch nicht. Da bin ich noch auf der Suche.
Wie gesagt, bitte Nachsicht bei Benennung und Struktur haben, dass ist nur grob zusammengeworfen.

Beim Client muss ich auch noch gucken, da steht im Memo immer nach dem Start, dass UseNagle aktiv wäre, obwohl es ausgeschaltet ist. Das suche ich auch noch. Hauptproblem ist aber wirklich das Write im Server

Client:
Delphi-Quellcode:
unit Unit1;

interface

uses
  Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants, System.Classes, Vcl.Graphics,
  Vcl.Controls, Vcl.Forms, Vcl.Dialogs, IdIOHandler, IdIOHandlerSocket,
  IdIOHandlerStack, IdBaseComponent, IdComponent, IdTCPConnection, IdTCPClient,
  System.SyncObjs, IdContext, IdGlobal, System.Generics.Collections,
  System.Diagnostics, Vcl.StdCtrls, Vcl.ExtCtrls;

type
  TMyTCPClient = class;

  TDataRec = record
    Daten : TIdBytes;
    Context : TIdContext;
  end;

  TReceiveEvent = procedure(Sender: TObject; aData : TDataRec) of Object;

  TDataQueue = class
  private
    FQueue: TQueue<TDataRec>;
    FLock: TCriticalSection;
  public
    constructor Create;
    destructor Destroy; override;
    procedure Enqueue(const Data: TDataRec);
    function Dequeue: TDataRec;
  end;

  TProcessingThread = class(TThread)
  private
    FDataQueue: TDataQueue;

    Anz : LongWord;
  protected
    procedure Execute; override;
  public
    OnReceive : TReceiveEvent;

    constructor Create(ADataQueue: TDataQueue);
  end;

  TReceiveThread = class(TThread)
  private
    FDataQueue: TDataQueue;
    FParent : TMyTCPClient;
    PrtGes : Boolean;

    Anz : LongWord;
  protected
    procedure Execute; override;
  public
    constructor Create(aParent : TMyTCPClient; ADataQueue: TDataQueue);
  end;

  TMyTCPClient = class
  private
    FDataQueue : TDataQueue;
    FProcessingThread: TProcessingThread;

    FReceiveThread: TReceiveThread;

    FParentClient : TIdTCPClient;
    FForm : TForm;

    procedure OnClientReadData(Sender: TObject; aData : TDataRec);
  public
    constructor Create(aForm : TForm);
    destructor Destroy; override;
    procedure MyConnect(const AHost: string; APort: Integer);
    procedure Disconnect;
    procedure SendData(const Data: TDataRec);
  end;

  TForm1 = class(TForm)
    IdTCPClient1: TIdTCPClient;
    IdIOHandlerStack1: TIdIOHandlerStack;
    Memo1: TMemo;
    UpdateTimer: TTimer;
    Button1: TButton;
    procedure UpdateTimerTimer(Sender: TObject);
    procedure FormCreate(Sender: TObject);
    procedure FormDestroy(Sender: TObject);
    procedure Button1Click(Sender: TObject);
  private
    { Private-Deklarationen }
    MyClient: TMyTCPClient;
    SL : TStringList;
  public
    { Public-Deklarationen }
    procedure Log(aStr : String);
  end;

var
  Form1: TForm1;

implementation

{$R *.dfm}

{ TDataQueue }

constructor TDataQueue.Create;
begin
  FQueue := TQueue<TDataRec>.Create;
  FLock := TCriticalSection.Create;
end;

destructor TDataQueue.Destroy;
begin
  FQueue.Free;
  FLock.Free;
  inherited;
end;

function TDataQueue.Dequeue: TDataRec;
begin
  FLock.Acquire;
  try
    if FQueue.Count > 0 then
      Result := FQueue.Dequeue
    else
    begin
      SetLength(Result.Daten, 0);
      Result.Context := Nil;
    end;
  finally
    FLock.Release;
  end;
end;

procedure TDataQueue.Enqueue(const Data: TDataRec);
begin
  FLock.Acquire;
  try
    FQueue.Enqueue(Data);
  finally
    FLock.Release;
  end;
end;

{ TProcessingThread }

constructor TProcessingThread.Create(ADataQueue: TDataQueue);
begin
  FDataQueue := ADataQueue;
  Anz := 0;
  inherited Create(False);
end;

procedure TProcessingThread.Execute;
var
  Data: TDataRec;
begin
  while not Terminated do
  begin
    Data := FDataQueue.Dequeue;
    if Length(Data.Daten) > 0 then
    begin
      if Assigned(OnReceive) then
        OnReceive(Self, Data);
    end
    else
      Sleep(1);
  end;
end;

{ TReceiveThread }

constructor TReceiveThread.Create(aParent: TMyTCPClient; ADataQueue: TDataQueue);
begin
  FDataQueue := ADataQueue;
  FParent := aParent;

  if FParent.FParentClient.UseNagle then
    Sleep(1);

  PrtGes := True;

  Anz := 0;
  inherited Create(False);
end;

procedure TReceiveThread.Execute;
var
  Buffer : TIdBytes;
  RecData : TDataRec;
begin
  while not Terminated do
  begin
    if Assigned(FParent) and Assigned(FParent.FParentClient) then
    begin
      if FParent.FParentClient.UseNagle then
        TForm1(FParent.FForm).Log('01-Client(TReceiveThread): UseNagle aktiv');

      if FParent.FParentClient.IOHandler.InputBuffer.Size > 0 then
      begin
        while FParent.FParentClient.IOHandler.InputBuffer.Size > 0 do
        begin
          SetLength(Buffer, FParent.FParentClient.IOHandler.InputBuffer.Size);
          FParent.FParentClient.IOHandler.ReadBytes(Buffer, Length(Buffer), False);

          //Daten in Verarbeitungsliste aufnehmen
          RecData.Daten := Buffer;
          RecData.Context := Nil;

          FDataQueue.Enqueue(RecData);
        end;
      end
      else
        Sleep(1);
    end;
  end;
end;

{ TMyTCPClient }

procedure TMyTCPClient.MyConnect(const AHost: string; APort: Integer);
begin
  FParentClient.Host := AHost;
  FParentClient.Port := APort;
  FParentClient.ConnectTimeout := 5000; // 5 Sekunden Timeout
  FParentClient.ReadTimeout := 5000; // 5 Sekunden Timeout für Lesevorgänge
  FParentClient.UseNagle := False;
  FParentClient.Connect;
  TForm1(FForm).Log('Verbunden mit ' + AHost + ':' + APort.ToString);
end;

constructor TMyTCPClient.Create(aForm : TForm);
begin
  FForm := aForm;

  FParentClient := TForm1(FForm).IdTCPClient1;

  if FParentClient.UseNagle then
    Sleep(1);

  FDataQueue := TDataQueue.Create;

  //wird nur beim Slave genutzt
  FProcessingThread := TProcessingThread.Create(FDataQueue);
  FProcessingThread.OnReceive := OnClientReadData;

  FReceiveThread := TReceiveThread.Create(Self, FDataQueue);
end;

destructor TMyTCPClient.Destroy;
begin
  if Assigned(FReceiveThread) then
    FreeAndNil(FReceiveThread);

  if Assigned(FProcessingThread) then
    FreeAndNil(FProcessingThread);

  if Assigned(FDataQueue) then
    FreeAndNil(FDataQueue);

  Disconnect;
  inherited;
end;

procedure TMyTCPClient.Disconnect;
begin
  if FParentClient.Connected then
  begin
    FParentClient.Disconnect;
    TForm1(FForm).Log('Verbindung getrennt.');
  end;
end;

procedure TMyTCPClient.SendData(const Data: TDataRec);
begin
  if FParentClient.Connected then
  begin
    if FParentClient.UseNagle then
      TForm1(FForm).Log('01-Client(SendData): UseNagle aktiv');

    FParentClient.IOHandler.WriteDirect(Data.Daten);
    //TForm1(FForm).Log(Now, ' Gesendet: ', Length(Data), ' Bytes');
  end
  else
  begin
    FParentClient.Connect;
    //TForm1(FForm).Log('Fehler: Nicht verbunden.');
  end;
end;

procedure TMyTCPClient.OnClientReadData(Sender: TObject; aData : TDataRec);
var
  IData : AnsiString;
begin
  if not Assigned(FParentClient) then
    Exit;

  SetLength(IData,Length(aData.Daten));
  Move(aData.Daten[0],IData[1],Length(aData.Daten));

  //irgendwas mit den Daten machen...
end;

procedure TForm1.Button1Click(Sender: TObject);
var
  TestData: TDataRec;
  Anz : LongWord;
begin
  if not Assigned(MyClient) then
    Exit;

  var sw3 := TStopwatch.StartNew;
  var t3 : Int64;

  SetLength(TestData.Daten, 61000); //1024
  FillChar(TestData.Daten[0], Length(TestData.Daten), 65);

  TestData.Context := Nil;

  Anz := 0;

  for var i := 1 to 200 do
  begin
    Inc(Anz, Length(TestData.Daten));

    MyClient.SendData(TestData);
  end;

  t3 := sw3.ElapsedMilliseconds; //Zeitmessung stoppen
  Log('Zeitdauer: ' + t3.ToString + ' ms');

  Log('Gesamtlänge: ' + Anz.ToString + ' Bytes');
end;

procedure TForm1.FormCreate(Sender: TObject);
begin
  SL := TStringList.Create;
  Memo1.Clear;

  IdTCPClient1.UseNagle := False;

  try
    MyClient := TMyTCPClient.Create(Self);
    try
      MyClient.MyConnect('127.0.0.1', 5000);
    finally

    end;
  except
    on E: Exception do
      Log('Fehler: ' + E.Message);
  end;
end;

procedure TForm1.FormDestroy(Sender: TObject);
begin
  MyClient.Disconnect;
  FreeAndNil(MyClient);
  FreeAndNil(SL);
end;

procedure TForm1.Log(aStr : String);
begin
  SL.Add(aStr);

  if UpdateTimer.Enabled then
    Exit;

  UpdateTimer.Enabled := True;
end;

procedure TForm1.UpdateTimerTimer(Sender: TObject);
begin
  UpdateTimer.Enabled := False;

  Memo1.Lines.Text := SL.Text;
end;

end.
Server:
Delphi-Quellcode:
unit Unit1;

interface

uses
  Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants,
  System.Classes, Vcl.Graphics, Vcl.Controls, Vcl.Forms, Vcl.Dialogs,
  IdServerIOHandler, IdServerIOHandlerSocket, IdServerIOHandlerStack,
  IdBaseComponent, IdComponent, IdCustomTCPServer, IdTCPServer, System.SyncObjs,
  System.Generics.Collections, System.Diagnostics, IdGlobal, IdContext,
  Vcl.StdCtrls, Vcl.ExtCtrls;

type
  TMyTCPServer = class;

  TDataRec = record
    Daten : TIdBytes;
    Context : TIdContext;
  end;

  TReceiveEvent = procedure(Sender: TObject; aData : TDataRec) of Object;

  TDataQueue = class
  private
    FQueue: TQueue<TDataRec>;
    FLock: TCriticalSection;
  public
    constructor Create;
    destructor Destroy; override;
    procedure Enqueue(const Data: TDataRec);
    function Dequeue: TDataRec;
  end;

  TProcessingThread = class(TThread)
  private
    FDataQueue: TDataQueue;

    Anz : LongWord;

    LastPrt : String;

    procedure Log;
  protected
    procedure Execute; override;
  public
    OnReceive : TReceiveEvent;

    constructor Create(ADataQueue: TDataQueue);
  end;

  TSendeThread = class(TThread)
  private
    FDataQueue: TDataQueue;
    FParent : TMyTCPServer;
    PrtGes : Boolean;

    Anz : LongWord;
    LastPrt : String;

    procedure Log;
  protected
    procedure Execute; override;
  public
    constructor Create(aParent : TMyTCPServer; ADataQueue: TDataQueue);
  end;

  TMyTCPServer = class
  private
    FDataQueue: TDataQueue;
    FSendeDataQueue : TDataQueue;
    FParentServer : TIdTCPServer;
    FForm : TForm;

    FProcessingThread: TProcessingThread;
    FSendeThread: TSendeThread;
    FAnzEmpfang : LongWord;
    FBytesEmpfang : LongWord;

    ReadingIsActiv : Boolean;

    LastRecData : TDataRec;

    LastPrt : String;

    LastContext : TIdContext;

    procedure Log;

    procedure OnExecuteHandler(AContext: TIdContext);

    procedure OnServerReadData(Sender: TObject; aData : TDataRec);
  public
    constructor Create(aForm : TForm);
    destructor Destroy; override;
    procedure Start;
    procedure Stop;
  end;

  TForm1 = class(TForm)
    IdTCPServer: TIdTCPServer;
    IdServerIOHandlerStack: TIdServerIOHandlerStack;
    Memo1: TMemo;
    UpdateTimer: TTimer;
    Button1: TButton;
    procedure FormCreate(Sender: TObject);
    procedure FormDestroy(Sender: TObject);
    procedure UpdateTimerTimer(Sender: TObject);
    procedure Button1Click(Sender: TObject);
  private
    { Private-Deklarationen }
    MyServer: TMyTCPServer;
    SL : TStringList;
  public
    { Public-Deklarationen }
    procedure Log(aStr : String);
  end;

var
  Form1: TForm1;

implementation

{$R *.dfm}

{ TDataQueue }

constructor TDataQueue.Create;
begin
  FQueue := TQueue<TDataRec>.Create;
  FLock := TCriticalSection.Create;
end;

destructor TDataQueue.Destroy;
begin
  FQueue.Free;
  FLock.Free;
  inherited;
end;

function TDataQueue.Dequeue: TDataRec;
begin
  FLock.Acquire;
  try
    if FQueue.Count > 0 then
      Result := FQueue.Dequeue
    else
    begin
      SetLength(Result.Daten, 0);
      Result.Context := Nil;
    end;
  finally
    FLock.Release;
  end;
end;

procedure TDataQueue.Enqueue(const Data: TDataRec);
begin
  FLock.Acquire;
  try
    FQueue.Enqueue(Data);
  finally
    FLock.Release;
  end;
end;

{ TProcessingThread }

constructor TProcessingThread.Create(ADataQueue: TDataQueue);
begin
  FDataQueue := ADataQueue;
  Anz := 0;
  inherited Create(False);
end;

procedure TProcessingThread.Log;
begin
  //TForm1(FParent.FForm).Log(LastPrt);
end;

procedure TProcessingThread.Execute;
var
  Data: TDataRec;
begin
  while not Terminated do
  begin
    Data := FDataQueue.Dequeue;
    if Length(Data.Daten) > 0 then
    begin
      if Assigned(OnReceive) then
        OnReceive(Self, Data);

      //TForm1(FParent.FForm).Log('Empfangen: ', Length(Data), ' Bytes' + '- Anz: ' + Anz.ToString);
    end
    else
      Sleep(1);

    if (FDataQueue.FQueue.Count = 0) then
    begin
      //TForm1(FParent.FForm).Log('Gesamtlänge Empfang: ' + Anz.ToString + ' Bytes');
    end;
  end;
end;

{ TMyTCPServer }

constructor TMyTCPServer.Create(aForm : TForm);
begin
  FDataQueue := TDataQueue.Create;
  FSendeDataQueue := TDataQueue.Create;

  LastContext := Nil;

  FProcessingThread := TProcessingThread.Create(FDataQueue);
  FProcessingThread.OnReceive := OnServerReadData;

  FSendeThread := TSendeThread.Create(Self, FSendeDataQueue);

  FForm := aForm;

  LastRecData.Context := Nil;

  FParentServer := TForm1(FForm).IdTCPServer;
  FParentServer.DefaultPort := 5000;
  FParentServer.OnExecute := OnExecuteHandler;
end;

destructor TMyTCPServer.Destroy;
begin
  Stop;
  FreeAndNil(FSendeThread);
  FreeAndNil(FProcessingThread);
  FreeAndNil(FSendeDataQueue);
  FreeAndNil(FDataQueue);
  inherited;
end;

procedure TMyTCPServer.Log;
begin
  TForm1(FForm).Log(LastPrt);
end;

procedure TMyTCPServer.OnExecuteHandler(AContext: TIdContext);
var
  Buffer : TIdBytes;
  RecData : TDataRec;
begin
  if AContext.Connection.IOHandler.InputBuffer.Size > 0 then
  begin
    LastContext := AContext;

    ReadingIsActiv := True;
    while AContext.Connection.IOHandler.InputBuffer.Size > 0 do
    begin
      Inc(FAnzEmpfang);
      Inc(FBytesEmpfang, AContext.Connection.IOHandler.InputBuffer.Size);

      SetLength(Buffer, AContext.Connection.IOHandler.InputBuffer.Size); //<- so viel einlesen wie im Buffer enthalten ist
      AContext.Connection.IOHandler.ReadBytes(Buffer, Length(Buffer), False);

      //Daten in Verarbeitungsliste aufnehmen
      RecData.Daten := Buffer;
      RecData.Context := AContext;

      FDataQueue.Enqueue(RecData);
    end;
    ReadingIsActiv := False;
  end
  else
  begin
    Sleep(1);

    if (FAnzEmpfang <> 0) or (FBytesEmpfang <> 0) then
    begin
      //TForm1(FForm).Log('Receive-Anzahl: ' + FAnzEmpfang.ToString);
      //TForm1(FForm).Log('Receive-Bytes: ' + FBytesEmpfang.ToString);

      FAnzEmpfang := 0;
      FBytesEmpfang := 0;
    end;
  end;
end;

procedure TMyTCPServer.OnServerReadData(Sender: TObject; aData : TDataRec);
var
  IData : AnsiString;
begin
  if not Assigned(aData.Context) then
  begin
    TForm1(FForm).Log('Receive: ' +
      ' Fehler bei Daten von Client: ungültige Context-Angabe');

    Exit;
  end;

  if not Assigned(aData.Context.Binding) then
  begin
    TForm1(FForm).Log('Receive: ' +
      ' Fehler bei Daten von Client: ungültige Binding-Angabe');

    Exit;
  end;

  SetLength(IData,Length(aData.Daten));
  Move(aData.Daten[0],IData[1],Length(aData.Daten));

  LastRecData := aData;

  //irgendwas mit den Daten machen...
end;

procedure TMyTCPServer.Start;
begin
  FParentServer.Active := True;
end;

procedure TMyTCPServer.Stop;
begin
  FParentServer.Active := False;
end;

{ TSendeThread }

constructor TSendeThread.Create(aParent: TMyTCPServer; ADataQueue: TDataQueue);
begin
  FDataQueue := ADataQueue;
  FParent := aParent;

  PrtGes := True;

  Anz := 0;
  inherited Create(False);
end;

procedure TSendeThread.Log;
begin
  TForm1(FParent.FForm).Log(LastPrt);
end;

procedure TSendeThread.Execute;
var
  Data: TDataRec;
begin
  while not Terminated do
  begin
    if Assigned(FParent) and Assigned(FParent.FParentServer) then
    begin
      Data := FDataQueue.Dequeue;
      if Length(Data.Daten) > 0 then
      begin
        Inc(Anz, Length(Data.Daten));

        if FParent.FParentServer.UseNagle then
        begin
          //TForm1(FParent.FForm).Log('01-Server(TSendeThread): UseNagle aktiv');
        end;

        if FParent.ReadingIsActiv then
        begin
          //TForm1(FParent.FForm).Log('01-Server: Lesevorgang parallel aktiv');
        end;

        {
        if Assigned(Data.Context) and Assigned(Data.Context.Connection) then
        begin
          var sw3 := TStopwatch.StartNew;
          var t3 : Int64;

          if Data.Context.Connection.Connected then
          begin
            Data.Context.Connection.IOHandler.WriteDirect(Data.Daten);

            //TForm1(FParent.FForm).Log('01-Server: Gesendet. Restanzahl: ' + FDataQueue.FQueue.Count.ToString);
          end;

          t3 := sw3.ElapsedMilliseconds; //Zeitmessung stoppen
          if t3 > 50 then
          begin
            //TForm1(FParent.FForm).Log('Zeitdauer Senden: [' + t3.ToString + ']');
          end;

        end;
        }

        if Assigned(FParent.LastContext) and Assigned(FParent.LastContext.Connection) then
        begin
          var sw3 := TStopwatch.StartNew;
          var t3 : Int64;

          if FParent.LastContext.Connection.Connected then
          begin
            FParent.LastContext.Connection.IOHandler.WriteDirect(Data.Daten);

            //TForm1(FParent.FForm).Log('01-Server: Gesendet. Restanzahl: ' + FDataQueue.FQueue.Count.ToString);
          end;

          t3 := sw3.ElapsedMilliseconds; //Zeitmessung stoppen
          if t3 > 50 then
          begin
            //TForm1(FParent.FForm).Log('Zeitdauer Senden: [' + t3.ToString + ']');
          end;

        end;
      end
      else
        Sleep(1);
    end;
  end;
end;

procedure TForm1.Button1Click(Sender: TObject);
var
  TestData: TDataRec;
  tmpInt : Integer;
begin
  if not Assigned(MyServer) then
    Exit;

  for var i := 1 to 100 do
  begin
    tmpInt := Random(60000);
    if tmpInt < 10 then
      tmpInt := 10;

    SetLength(TestData.Daten, 60000);
    FillChar(TestData.Daten[0], Length(TestData.Daten), 65);

    TestData.Context := Nil;
    if Assigned(MyServer.LastRecData.Context) then
      TestData.Context := MyServer.LastRecData.Context;

    MyServer.FSendeDataQueue.Enqueue(TestData);
  end;
end;

procedure TForm1.FormCreate(Sender: TObject);
begin
  Randomize;

  SL := TStringList.Create;
  Memo1.Clear;

  try
    MyServer := TMyTCPServer.Create(Self);
    MyServer.Start;

    Log('Server läuft auf Port 5000');
  except
    on E: Exception do
      Log('Fehler: ' + E.Message);
  end;
end;

procedure TForm1.FormDestroy(Sender: TObject);
begin
  MyServer.Stop;
  FreeAndNil(MyServer);
  FreeAndNil(SL);
end;

procedure TForm1.Log(aStr : String);
begin
  Exit;

  System.TMonitor.Enter(SL);
  try
    SL.Add(aStr);

    if UpdateTimer.Enabled then
      Exit;

    UpdateTimer.Enabled := True;
  finally
    System.TMonitor.Exit(SL);
  end;
end;

procedure TForm1.UpdateTimerTimer(Sender: TObject);
begin
  Exit;

  UpdateTimer.Enabled := False;

  System.TMonitor.Enter(SL);
  try
    Memo1.Lines.Text := SL.Text;
  finally
    System.TMonitor.Exit(SL);
  end;
end;

end.
Ok, habe die Kommentare gesehen. Ich gucke wegen dem Timer und der VCL

Edit: TMonitor-Synchronisation für Stringliste eingefügt. Keine Änderung am Verhalten danach

Geändert von AJ_Oldendorf ( 9. Apr 2025 um 09:45 Uhr)
  Mit Zitat antworten Zitat