Einzelnen Beitrag anzeigen

AJ_Oldendorf

Registriert seit: 12. Jun 2009
440 Beiträge
 
Delphi 12 Athens
 
#46

AW: schnelle Server Client Verbindung ohne Verluste

  Alt 8. Apr 2025, 11:43
Hier ist eine Demo, womit sich das Verhalten fast 1:1 nachstellen lässt.
Ich habe die nur schnell zusammen geschustert

Vorgehensweise:
1)Server-Anwendung starten
2)Client-Anwendung starten
3)Client Button Klick -> Pakete senden
4)Server empfängt diese und zeigt es im Memo an
5)Server Button Klick -> Pakete werden nur 5 Stück geschickt (siehe Protokollierung Memo) und der Sendethread hängt irgendwo im Write-Aufruf da auf ein Breakpoint im while not Terminated nicht reagiert wird
6)Nochmal Daten vom Client senden über den Button Klick
7)Server verschickt die restlichen Telegramme (siehe Memo) und empfängt danach (da sieht man dann auch, dass der Write Aufruf mehrere Sekunden hing)

Hat jemand eine Idee dazu?

Client:
Delphi-Quellcode:
unit Unit1;

interface

uses
  Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants, System.Classes, Vcl.Graphics,
  Vcl.Controls, Vcl.Forms, Vcl.Dialogs, IdIOHandler, IdIOHandlerSocket,
  IdIOHandlerStack, IdBaseComponent, IdComponent, IdTCPConnection, IdTCPClient,
  System.SyncObjs, IdContext, IdGlobal, System.Generics.Collections,
  System.Diagnostics, Vcl.StdCtrls, Vcl.ExtCtrls;

type
  TDataQueue = class
  private
    FQueue: TQueue<TIdBytes>;
    FLock: TCriticalSection;
  public
    constructor Create;
    destructor Destroy; override;
    procedure Enqueue(const Data: TIdBytes);
    function Dequeue: TIdBytes;
  end;

  TProcessingThread = class(TThread)
  private
    FDataQueue: TDataQueue;

    Anz : LongWord;
  protected
    procedure Execute; override;
  public
    constructor Create(ADataQueue: TDataQueue);
  end;

  TMyTCPClient = class
  private
    FParentServer : TIdTCPClient;
    FForm : TForm;
  public
    constructor Create(aForm : TForm);
    destructor Destroy; override;
    procedure Connect(const AHost: string; APort: Integer);
    procedure Disconnect;
    procedure SendData(const Data: TIdBytes);
  end;

  TForm1 = class(TForm)
    IdTCPClient1: TIdTCPClient;
    IdIOHandlerStack1: TIdIOHandlerStack;
    Memo1: TMemo;
    UpdateTimer: TTimer;
    Button1: TButton;
    procedure UpdateTimerTimer(Sender: TObject);
    procedure FormCreate(Sender: TObject);
    procedure FormDestroy(Sender: TObject);
    procedure Button1Click(Sender: TObject);
  private
    { Private-Deklarationen }
    MyClient: TMyTCPClient;
    SL : TStringList;
  public
    { Public-Deklarationen }
    procedure Log(aStr : String);
  end;

var
  Form1: TForm1;

implementation

{$R *.dfm}

{ TDataQueue }

constructor TDataQueue.Create;
begin
  FQueue := TQueue<TIdBytes>.Create;
  FLock := TCriticalSection.Create;
end;

destructor TDataQueue.Destroy;
begin
  FQueue.Free;
  FLock.Free;
  inherited;
end;

function TDataQueue.Dequeue: TIdBytes;
begin
  FLock.Acquire;
  try
    if FQueue.Count > 0 then
      Result := FQueue.Dequeue
    else
      SetLength(Result, 0);
  finally
    FLock.Release;
  end;
end;

procedure TDataQueue.Enqueue(const Data: TIdBytes);
begin
  FLock.Acquire;
  try
    FQueue.Enqueue(Data);
  finally
    FLock.Release;
  end;
end;

{ TProcessingThread }

constructor TProcessingThread.Create(ADataQueue: TDataQueue);
begin
  FDataQueue := ADataQueue;
  Anz := 0;
  inherited Create(False);
end;

procedure TProcessingThread.Execute;
var
  Data: TIdBytes;
begin
  while not Terminated do
  begin

  end;
end;

{ TMyTCPClient }

procedure TMyTCPClient.Connect(const AHost: string; APort: Integer);
begin
  FParentServer.Host := AHost;
  FParentServer.Port := APort;
  FParentServer.ConnectTimeout := 5000; // 5 Sekunden Timeout
  FParentServer.ReadTimeout := 5000; // 5 Sekunden Timeout für Lesevorgänge
  FParentServer.Connect;
  TForm1(FForm).Log('Verbunden mit ' + AHost + ':' + APort.ToString);
end;

constructor TMyTCPClient.Create(aForm : TForm);
begin
  FForm := aForm;

  FParentServer := TForm1(FForm).IdTCPClient1;
end;

destructor TMyTCPClient.Destroy;
begin
  Disconnect;
  inherited;
end;

procedure TMyTCPClient.Disconnect;
begin
  if FParentServer.Connected then
  begin
    FParentServer.Disconnect;
    TForm1(FForm).Log('Verbindung getrennt.');
  end;
end;

procedure TMyTCPClient.SendData(const Data: TIdBytes);
begin
  if FParentServer.Connected then
  begin
    FParentServer.IOHandler.WriteDirect(Data);
    //TForm1(FForm).Log(Now, ' Gesendet: ', Length(Data), ' Bytes');
  end
  else
  begin
    FParentServer.Connect;
    //TForm1(FForm).Log('Fehler: Nicht verbunden.');
  end;
end;

procedure TForm1.Button1Click(Sender: TObject);
var
  TestData: TIdBytes;
  Anz : LongWord;
begin
  if not Assigned(MyClient) then
    Exit;

  var sw3 := TStopwatch.StartNew;
  var t3 : Int64;

  SetLength(TestData, 61000); //1024
  FillChar(TestData[0], Length(TestData), 65);

  Anz := 0;

  for var i := 1 to 200 do
  begin
    Inc(Anz, Length(TestData));

    MyClient.SendData(TestData);
  end;

  t3 := sw3.ElapsedMilliseconds; //Zeitmessung stoppen
  Log('Zeitdauer: ' + t3.ToString + ' ms');

  Log('Gesamtlänge: ' + Anz.ToString + ' Bytes');
end;

procedure TForm1.FormCreate(Sender: TObject);
begin
  SL := TStringList.Create;
  Memo1.Clear;

  try
    MyClient := TMyTCPClient.Create(Self);
    try
      MyClient.Connect('127.0.0.1', 5000);

      var sw3 := TStopwatch.StartNew;
      var t3 : Int64;
    finally

    end;
  except
    on E: Exception do
      Log('Fehler: ' + E.Message);
  end;
end;

procedure TForm1.FormDestroy(Sender: TObject);
begin
  MyClient.Disconnect;
  FreeAndNil(MyClient);
  FreeAndNil(SL);
end;

procedure TForm1.Log(aStr : String);
begin
  SL.Add(aStr);

  if UpdateTimer.Enabled then
    Exit;

  UpdateTimer.Enabled := True;
end;

procedure TForm1.UpdateTimerTimer(Sender: TObject);
begin
  UpdateTimer.Enabled := False;

  Memo1.Lines.Text := SL.Text;
end;

end.
Server:
Delphi-Quellcode:
unit Unit1;

interface

uses
  Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants,
  System.Classes, Vcl.Graphics, Vcl.Controls, Vcl.Forms, Vcl.Dialogs,
  IdServerIOHandler, IdServerIOHandlerSocket, IdServerIOHandlerStack,
  IdBaseComponent, IdComponent, IdCustomTCPServer, IdTCPServer, System.SyncObjs,
  System.Generics.Collections, System.Diagnostics, IdGlobal, IdContext,
  Vcl.StdCtrls, Vcl.ExtCtrls;

type
  TMyTCPServer = class;

  TDataRec = record
    Daten : TIdBytes;
    Context : TIdContext;
  end;

  TReceiveEvent = procedure(Sender: TObject; aData : TDataRec) of Object;

  TDataQueue = class
  private
    FQueue: TQueue<TIdBytes>;
    FLock: TCriticalSection;
  public
    constructor Create;
    destructor Destroy; override;
    procedure Enqueue(const Data: TIdBytes);
    function Dequeue: TIdBytes;
  end;

  TProcessingThread = class(TThread)
  private
    FDataQueue: TDataQueue;

    Anz : LongWord;
  protected
    procedure Execute; override;
  public
    OnReceive : TReceiveEvent;

    constructor Create(ADataQueue: TDataQueue);
  end;

  TSendeThread = class(TThread)
  private
    FDataQueue: TDataQueue;
    FParent : TMyTCPServer;
    PrtGes : Boolean;

    Anz : LongWord;
  protected
    procedure Execute; override;
  public
    constructor Create(aParent : TMyTCPServer; ADataQueue: TDataQueue);
  end;

  TMyTCPServer = class
  private
    FDataQueue: TDataQueue;
    FSendeDataQueue : TDataQueue;
    FParentServer : TIdTCPServer;
    FForm : TForm;

    FProcessingThread: TProcessingThread;
    FSendeThread: TSendeThread;
    FLastContext : TIdContext;
    FAnzEmpfang : LongWord;
    FBytesEmpfang : LongWord;

    ReadingIsActiv : Boolean;

    procedure OnExecuteHandler(AContext: TIdContext);

    procedure OnServerReadData(Sender: TObject; aData : TDataRec);
  public
    constructor Create(aForm : TForm);
    destructor Destroy; override;
    procedure Start;
    procedure Stop;
  end;

  TForm1 = class(TForm)
    IdTCPServer: TIdTCPServer;
    IdServerIOHandlerStack: TIdServerIOHandlerStack;
    Memo1: TMemo;
    UpdateTimer: TTimer;
    Button1: TButton;
    procedure FormCreate(Sender: TObject);
    procedure FormDestroy(Sender: TObject);
    procedure UpdateTimerTimer(Sender: TObject);
    procedure Button1Click(Sender: TObject);
  private
    { Private-Deklarationen }
    MyServer: TMyTCPServer;
    SL : TStringList;
  public
    { Public-Deklarationen }
    procedure Log(aStr : String);
  end;

var
  Form1: TForm1;

implementation

{$R *.dfm}

{ TDataQueue }

constructor TDataQueue.Create;
begin
  FQueue := TQueue<TIdBytes>.Create;
  FLock := TCriticalSection.Create;
end;

destructor TDataQueue.Destroy;
begin
  FQueue.Free;
  FLock.Free;
  inherited;
end;

function TDataQueue.Dequeue: TIdBytes;
begin
  FLock.Acquire;
  try
    if FQueue.Count > 0 then
      Result := FQueue.Dequeue
    else
      SetLength(Result, 0);
  finally
    FLock.Release;
  end;
end;

procedure TDataQueue.Enqueue(const Data: TIdBytes);
begin
  FLock.Acquire;
  try
    FQueue.Enqueue(Data);
  finally
    FLock.Release;
  end;
end;

{ TProcessingThread }

constructor TProcessingThread.Create(ADataQueue: TDataQueue);
begin
  FDataQueue := ADataQueue;
  Anz := 0;
  inherited Create(False);
end;

procedure TProcessingThread.Execute;
var
  Data: TIdBytes;
begin
  while not Terminated do
  begin
    Data := FDataQueue.Dequeue;
    if Length(Data) > 0 then
    begin
      Inc(Anz, Length(Data));

      //TForm1(FParent.FForm).Log('Empfangen: ', Length(Data), ' Bytes' + '- Anz: ' + Anz.ToString);
    end
    else
      Sleep(1);

    if (FDataQueue.FQueue.Count = 0) then
    begin
      //TForm1(FParent.FForm).Log('Gesamtlänge Empfang: ' + Anz.ToString + ' Bytes');
    end;
  end;
end;

{ TMyTCPServer }

constructor TMyTCPServer.Create(aForm : TForm);
begin
  FDataQueue := TDataQueue.Create;
  FSendeDataQueue := TDataQueue.Create;
  FLastContext := Nil;

  FProcessingThread := TProcessingThread.Create(FDataQueue);
  FProcessingThread.OnReceive := OnServerReadData;

  FSendeThread := TSendeThread.Create(Self, FSendeDataQueue);

  FForm := aForm;

  FParentServer := TForm1(FForm).IdTCPServer;
  FParentServer.DefaultPort := 5000;
  FParentServer.OnExecute := OnExecuteHandler;
end;

destructor TMyTCPServer.Destroy;
begin
  Stop;
  FreeAndNil(FSendeThread);
  FreeAndNil(FProcessingThread);
  FreeAndNil(FSendeDataQueue);
  FreeAndNil(FDataQueue);
  inherited;
end;

procedure TMyTCPServer.OnExecuteHandler(AContext: TIdContext);
var
  Buffer: TIdBytes;
begin
  FLastContext := AContext;
  if AContext.Connection.IOHandler.InputBuffer.Size > 0 then
  begin
    ReadingIsActiv := True;
    while AContext.Connection.IOHandler.InputBuffer.Size > 0 do
    begin
      Inc(FAnzEmpfang);
      Inc(FBytesEmpfang, AContext.Connection.IOHandler.InputBuffer.Size);

      SetLength(Buffer, AContext.Connection.IOHandler.InputBuffer.Size); //<- so viel einlesen wie im Buffer enthalten ist
      AContext.Connection.IOHandler.ReadBytes(Buffer, Length(Buffer), False);
      FDataQueue.Enqueue(Buffer);
    end;
    ReadingIsActiv := False;
  end
  else
  begin
    Sleep(1);

    if (FAnzEmpfang <> 0) or (FBytesEmpfang <> 0) then
    begin
      TForm1(FForm).Log('Receive-Anzahl: ' + FAnzEmpfang.ToString);
      TForm1(FForm).Log('Receive-Bytes: ' + FBytesEmpfang.ToString);

      FAnzEmpfang := 0;
      FBytesEmpfang := 0;
    end;
  end;
end;

procedure TMyTCPServer.OnServerReadData(Sender: TObject; aData : TDataRec);
var
  IData : AnsiString;
begin
  if not Assigned(aData.Context) then
  begin
    TForm1(FForm).Log('Receive: ' +
      ' Fehler bei Daten von Client: ungültige Context-Angabe');

    Exit;
  end;

  if not Assigned(aData.Context.Binding) then
  begin
    TForm1(FForm).Log('Receive: ' +
      ' Fehler bei Daten von Client: ungültige Binding-Angabe');

    Exit;
  end;

  SetLength(IData,Length(aData.Daten));
  Move(aData.Daten[0],IData[1],Length(aData.Daten));

  //irgendwas mit den Daten machen...
end;

procedure TMyTCPServer.Start;
begin
  FParentServer.Active := True;
end;

procedure TMyTCPServer.Stop;
begin
  FParentServer.Active := False;
end;

{ TSendeThread }

constructor TSendeThread.Create(aParent: TMyTCPServer; ADataQueue: TDataQueue);
begin
  FDataQueue := ADataQueue;
  FParent := aParent;

  PrtGes := True;

  Anz := 0;
  inherited Create(False);
end;

procedure TSendeThread.Execute;
var
  Data: TIdBytes;
begin
  while not Terminated do
  begin
    if Assigned(FParent) and Assigned(FParent.FParentServer) then
    begin
      Data := FDataQueue.Dequeue;
      if Length(Data) > 0 then
      begin
        Inc(Anz, Length(Data));

        if FParent.FParentServer.UseNagle then
          TForm1(FParent.FForm).Log('01-Server: UseNagle aktiv');

        if FParent.ReadingIsActiv then
          TForm1(FParent.FForm).Log('01-Server: Lesevorgang parallel aktiv');

        if Assigned(FParent.FLastContext) then
        begin
          var sw3 := TStopwatch.StartNew;
          var t3 : Int64;

          if FParent.FLastContext.Connection.Connected then
          begin
            FParent.FLastContext.Connection.IOHandler.WriteDirect(Data);

            TForm1(FParent.FForm).Log('01-Server: Gesendet. Restanzahl: ' + FDataQueue.FQueue.Count.ToString);
          end;

          t3 := sw3.ElapsedMilliseconds; //Zeitmessung stoppen
          if t3 > 50 then
            TForm1(FParent.FForm).Log('Zeitdauer Senden: [' + t3.ToString + ']');

        end;
      end
      else
        Sleep(1);
    end;
  end;
end;

procedure TForm1.Button1Click(Sender: TObject);
var
  TestData: TIdBytes;
  tmpInt : Integer;
begin
  if not Assigned(MyServer) then
    Exit;

  for var i := 1 to 100 do
  begin
    tmpInt := Random(60000);
    if tmpInt < 10 then
      tmpInt := 10;

    SetLength(TestData, 60000);
    FillChar(TestData[0], Length(TestData), 65);

    MyServer.FSendeDataQueue.Enqueue(TestData);
  end;
end;

procedure TForm1.FormCreate(Sender: TObject);
begin
  Randomize;

  SL := TStringList.Create;
  Memo1.Clear;

  try
    MyServer := TMyTCPServer.Create(Self);
    MyServer.Start;

    Log('Server läuft auf Port 5000');
  except
    on E: Exception do
      Log('Fehler: ' + E.Message);
  end;
end;

procedure TForm1.FormDestroy(Sender: TObject);
begin
  MyServer.Stop;
  FreeAndNil(MyServer);
  FreeAndNil(SL);
end;

procedure TForm1.Log(aStr : String);
begin
  SL.Add(aStr);

  if UpdateTimer.Enabled then
    Exit;

  UpdateTimer.Enabled := True;
end;

procedure TForm1.UpdateTimerTimer(Sender: TObject);
begin
  UpdateTimer.Enabled := False;

  Memo1.Lines.Text := SL.Text;
end;

end.
Angehängte Dateien
Dateityp: zip Client.zip (7,0 KB, 4x aufgerufen)
Dateityp: zip Server.zip (7,7 KB, 4x aufgerufen)
  Mit Zitat antworten Zitat