Einzelnen Beitrag anzeigen

peterbelow

Registriert seit: 12. Jan 2019
Ort: Hessen
701 Beiträge
 
Delphi 12 Athens
 
#2

AW: Serielle Kommunikation in eine Art queue auslagern

  Alt 7. Mär 2019, 14:27
Thomas, Dein Ansatz (ein Thread mit einer Queue von abzuarbeitenden Kommandos) ist genau richtig für das Problem.

Zu beachten dabei ist folgendes:

* Die Queue muss thread-safe sein, da der UI-Thread Kommandos hineinschreibt und der Arbeitsthread sie herausholt.
* Es muss einen Mechanismus geben, der dem Thread mitteilt, das Arbeit in der Queue ist, wenn der UI-Thread etwas hinzufügt.
* Die Interaktion mit der seriellen Schnittstelle sollte vollständig innerhalb des Arbeitsthreads erfolgen.
* Der Arbeitsthread sollte kontrolliert beendet werden können, wenn das Programm die Kommunikation mit dem externen Partner beenden will.
* Alle Daten (Fortschritt, Fehler), die der Arbeitsthread an den UI-Thread schicken will müssen per Synchronize übermittelt werden, so dass die Daten im Kontext des UI-Threads verarbeitet werden.

Fangen wir also mal mit einer von TThread abgeleiteten Klasse an. Wir überschrieben die Execute-Methode, deren Kode innerhalb des sekundären Threads abgearbeitet werden soll.

Der generelle Aufbau (inklusive Fehlerbehandlung) sieht so aus:
Delphi-Quellcode:
procedure TSerialComThread.Execute;
begin
  try
    InitializeSerialPort;
    try
      while not Terminated do begin
        WaitForWork;
        while not Terminated and MoreWorkAvailable do
          ProcessOneCommand;
      end;
    finally
      CloseSerialPort;
    end;
  except
    On E: Exception do
      ReportError(MakeErrorMsg(E), ERROR_IS_FATAL);
  end;
end;
Der Plan ist, dass der Thread sofort nachdem er erzeugt wurde den seriellen Port öffnet und dann wartet, bis ein Kommando zu Bearbeitung ansteht. Wenn das der Fall ist arbeitet er alle Kommandos ab, die in der Queue stehen, bis die Queue leer ist, dann geht er wieder in den Wartezustand.

Jetzt müssen wir nur noch die ganzen Methoden implementieren, die in Execute aufgerufen werden. Als erstes brauchen wir natürlich eine dem Problem angemessene Queue-Klasse. Da deine Kommandos vermutlich in Strings abgelegt sind können wir mit einer TQueue<string> aus der mitgelieferten Unit von generics beginnen. Da die Queue aber thread-safe sein soll verpacken wir sie in eine eigene Klasse, die den Zugriff auf die interne Queue regelt.

Delphi-Quellcode:
type
  TCommandQueue = class
  strict private
    FCommands: TQueue<string>;
    FSignal: TSimpleEvent;
  strict protected
    property Commands: TQueue<string> read FCommands;
    property Signal: TSimpleEvent read FSignal;
  public
    constructor Create;
    destructor Destroy; override;
    function HasCommands: boolean;
    function Pop: string;
    procedure Push(const aCommand: string);
    procedure SetEvent;
    procedure WaitFor;
  end;
Die Push, Pop und HasCommands-Methoden müssen thread-safe sein, Push löst das Signal aus, auf das WaitFor (in einem anderen Thread) wartet. SetEvent erlaubt es, das Signal von aussen auszulösen, wenn der Thread beendet werden soll.

Die fertige Unit (völlig ungetestet!) sieht dann etwa wie folgt aus. Die TODO-Sachen sind dann dein Bier.

Delphi-Quellcode:

unit SerialComThreadU;

interface

uses
  System.Sysutils, System.Generics.Collections,
  System.Classes, System.SyncObjs;

type
  TCommandQueue = class
  strict private
    FCommands: TQueue<string>;
    FSignal: TSimpleEvent;
  strict protected
    property Commands: TQueue<string> read FCommands;
    property Signal: TSimpleEvent read FSignal;
  public
    constructor Create;
    destructor Destroy; override;
    function HasCommands: boolean;
    function Pop: string;
    procedure Push(const aCommand: string);
    procedure SetEvent;
    procedure WaitFor;
  end;

  TSerialComThread = class(TThread)
  strict private
    type
      TThreadErrorEvent = procedure (sender: TSerialComThread;
         const ErrMsg: string; isFatal: boolean);
      TThreadProgress = procedure (sender: TSerialComThread;
         const Command: string);
    var
    FCommands: TCommandQueue;
    FOnProgress: TThreadProgress;
    FOnThreadError: TThreadErrorEvent;
    procedure CloseSerialPort;
    procedure InitializeSerialPort;
    function MakeErrorMsg(aException: Exception): string;
    function MoreWorkAvailable: boolean;
    procedure ProcessOneCommand;
    procedure ReportError(const aErrorMsg: string;
      aIsFatalError: boolean = false);
    procedure WaitForWork;
  strict protected
    procedure DoProgress(const aCommand: string);
    procedure DoThreadError(const aErrorMsg: string; aIsFatalError: boolean =
        false);
    property Commands: TCommandQueue read FCommands;
  protected
    procedure Execute; override;
    procedure TerminatedSet; override;
  public
    constructor Create;
    destructor Destroy; override;
    procedure AddCommand(const aCommand: string);
    property OnProgress: TThreadProgress read FOnProgress write FOnProgress;
    property OnThreadError: TThreadErrorEvent read FOnThreadError write
        FOnThreadError;
  end;



implementation

resourcestring
  SThreadErrorMask =
    '%s ist auf eine Ausnahme vom Type %s gelaufen.'+SLinebreak+'%s';

const
  ERROR_IS_FATAL = true;

{== TSerialComThread ==================================================}


constructor TSerialComThread.Create;
begin
  FCommands := TCommandQueue.Create();
  inherited Create(false);
end;

destructor TSerialComThread.Destroy;
begin
  FCommands.Free;
  inherited Destroy;
end;

procedure TSerialComThread.AddCommand(const aCommand: string);
begin
  Commands.Push(aCommand);
end;

procedure TSerialComThread.CloseSerialPort;
begin
  // TODO -cMM: TSerialComThread.CloseSerialPort implement
end;

procedure TSerialComThread.DoProgress(const aCommand: string);
begin
  if Assigned(FOnProgress) then
    Synchronize(
      procedure begin
        FOnProgress(Self, aCommand);
      end);
end;

procedure TSerialComThread.DoThreadError(const aErrorMsg: string;
    aIsFatalError: boolean = false);
begin
  if Assigned(FOnThreadError) then
    Synchronize(
      procedure begin
        FOnThreadError(Self, aErrorMsg, aIsFatalError);
      end);
end;

procedure TSerialComThread.Execute;
begin
  try
    InitializeSerialPort;
    try
      while not Terminated do begin
        WaitForWork;
        while not Terminated and MoreWorkAvailable do
          ProcessOneCommand;
      end;
    finally
      CloseSerialPort;
    end;
  except
    On E: Exception do
      ReportError(MakeErrorMsg(E), ERROR_IS_FATAL);
  end;
end;

procedure TSerialComThread.InitializeSerialPort;
begin
  // TODO -cMM: TSerialComThread.InitializeSerialPort implement
end;

function TSerialComThread.MakeErrorMsg(aException: Exception): string;
begin
  Result := Format(SThreadErrorMask,
    [Classname, aException.ClassName, aException.Message]);
end;

function TSerialComThread.MoreWorkAvailable: boolean;
begin
  Result := Commands.HasCommands;
end;

procedure TSerialComThread.ProcessOneCommand;
var
  LCommand: string;
begin
  LCommand := Commands.Pop;
  try
    // TODO : send command over the port
    DoProgress(LCommand);
  except
    on E: Exception do
      ReportError(MakeErrorMsg(E), not ERROR_IS_FATAL);
  end;
end;

procedure TSerialComThread.ReportError(const aErrorMsg: string; aIsFatalError:
    boolean = false);
begin
  DoThreadError(aErrorMsg, aIsFatalError);
end;

procedure TSerialComThread.TerminatedSet;
begin
  inherited;
  Commands.SetEvent;
end;

procedure TSerialComThread.WaitForWork;
begin
  Commands.Waitfor;
end;

{== TCommandQueue =====================================================}


constructor TCommandQueue.Create;
begin
  inherited Create;
  FCommands := TQueue<string>.Create();
  FSignal := TSimpleEvent.Create();
end;

destructor TCommandQueue.Destroy;
begin
  FreeAndNil(FSignal);
  FreeAndNil(FCommands);
  inherited Destroy;
end;

function TCommandQueue.HasCommands: boolean;
begin
  MonitorEnter(self);
  try
    Result:= Commands.Count > 0;
  finally
    MonitorExit(self);
  end;
end;

function TCommandQueue.Pop: string;
begin
  MonitorEnter(self);
  try
    Result := Commands.Dequeue;
  finally
    MonitorExit(self);
  end;
end;

procedure TCommandQueue.Push(const aCommand: string);
begin
  MonitorEnter(self);
  try
    Commands.Enqueue(aCommand);
    Signal.SetEvent;
  finally
    MonitorExit(self);
  end;
end;

procedure TCommandQueue.SetEvent;
begin
  Signal.SetEvent;
end;

procedure TCommandQueue.WaitFor;
begin
  Signal.WaitFor;
  Signal.ResetEvent;
end;

end.
Peter Below
  Mit Zitat antworten Zitat