Einzelnen Beitrag anzeigen

Medium

Registriert seit: 23. Jan 2008
3.686 Beiträge
 
Delphi 2007 Enterprise
 
#1

Die lieben Threads mal wieder, es Fehlert so rum

  Alt 25. Jul 2011, 12:33
Aloah!
So ganz so grün bin ich in Sachen Multithreading ja eigentlich nicht - dachte ich - aber in meinem aktuellen Projekt hüpft mir gelegentlich dann doch die gelegentliche AV ins Gesicht.
Der Sinn des Teils ist: Es gibt eine Liste von Anfragen, die via TCP/IP an ein anderes System gesendet werden. Da die Antworten dieses Systems keine Zuordnung zur gemachten Anfrage zulassen, muss auf die Antwort gewartet werden, bevor eine nächste geschickt werden kann. Die Anfragen werden in einem definierten Zyklus verschickt, so dass ich das in einen Thread ausgelagert habe, der immer wieder diese Anfragen abschickt wenn sie anstehen, und zurück gelieferte Antworten verarbeitet (=Daten in meine DB schreibt).
Ich versuche mal die grobe Struktur darzustellen:

Delphi-Quellcode:
unit MyThreads;

interface

type
  TFetchThread = class(TThread)
  private
    FetchGroups: TFetchGroupList; // Eine List mit Instanzen von TFetchGroup, die sind nichts großartig spannendes. Reine Datenhalter.
    Sock: TClientSocket;
    Con: TUniConnection;
    Qry: TUniQuery;
    CycleThread: TCycleThread;
    NewDataArrived: Boolean;
    WaitingForReply: Boolean;
    SockForm: TForm;
    procedure IssueFetch(fetchGroupIndex: Integer);
    procedure HandleFetchReply(aReplyBuffer: TSockBuffer);
    procedure MakeDBEntriesForGroup(groupID: Integer; buf: TFetchEntryBuffer);
    procedure InitIssueFetch;
  protected
    procedure Execute; override;
  public
    constructor Create(aSocket: TClientSocket; aSocketForm: TForm; aDBConnection: TUniConnection);
    destructor Destroy; override;
    procedure SocketDataReady(aBuffer: TSockBuffer);
  end;

implementation

constructor TFetchThread.Create(aSocket: TClientSocket; aSocketForm: TForm; aDBConnection: TUniConnection);
var
  i, m: Integer;
begin
  inherited Create(true);

  Sock := aSocket; // Socket kommt vom MainForm
  SockForm := aSocketForm; // Das Handle brauche ich später auch

  // Eigene Connection für den Thread erstellen, Daten von der übergebenen Connection nehmen
  try
    Con := TUniConnection.Create(nil);
    Con.ProviderName := aDBConnection.ProviderName;
    Con.Server := aDBConnection.Server;
    Con.Database := aDBConnection.Database;
    Con.Port := aDBConnection.Port;
    Con.Username := aDBConnection.Username;
    Con.Password := aDBConnection.Password;
    Con.LoginPrompt := false;
    Con.Connect;
  except
    Con.Free;
    raise Exception.Create('Mäh');
  end;

  // Thread-eigene Query
  try
    Qry := TUniQuery.Create(nil);
    Qry.Connection := Con;
  except
    Qry.Free;
    raise Exception.Create('Muh');
  end;

  // Gruppen füllen, dies ist die einzige Stelle, an der an diesen strukturell etwas gemacht wird
  FetchGroups := TFetchFunctions.MakeFetchGroups(Qry);

  CycleThread := TCycleThread.Create;
  CycleThread.Items := FetchGroups;
  CycleThread.Resume;

  self.Resume;
end;

// WaitingForReply heisst, es wurde eine Anfrage gesendet, die Antwort steht noch aus. In der Zeit darf nichts gemacht werden.
// NewDataArrived wird im SocketRead gesetzt, wenn die erwartete Menge Bytes angekommen und valide ist.
procedure TFetchThread.Execute;
var
  i, k: Integer;
  lowCycle: Integer;
begin
  repeat
    if not WaitingForReply then
    begin
      // Gruppe suchen, die aktualisiert werden muss (die, die am längsten überfällig ist)
      lowCycle := 0;
      k := -1;
      for i := 0 to FetchGroups.Count-1 do
      begin
        if FetchGroups[i].CurrentCycleTime < lowCycle then
        begin
          lowCycle := FetchGroups[i].CurrentCycleTime;
          k := i;
        end;
      end;
      // Wurde eine gefunden, dann via Socket-Kompo auf dem MainForm die Anfrage senden
      if (k >= 0) then
      begin
        IssueFetchID := k;
        Synchronize(InitIssueFetch);
      end;
    end
    else
    begin
      // Wird eine Antwort zu einer Anfrage erwartet, und es ist eine eingetroffen, diese nun verarbeiten.
      if NewDataArrived then
        HandleFetchReply(ReplyBuffer); // Der Buffer ist ein array[0..8191] of Byte, mehr kommt definitiv nicht
    end;
    Sleep(1);
  until Terminated;
end;

procedure TFetchThread.HandleFetchReply(aReplyBuffer: TSockBuffer);
var
  h: TFetchAnswerHeader; // ein record
  isValid: Boolean;
  buf: TFetchEntryBuffer; // array of Byte
begin
  Move(aReplyBuffer[0], h, SizeOf(TFetchAnswerHeader));
  // Header auf Validität prüfen
  isValid := DiverseDinge;
  if isValid then
  begin
    // ReplyGroupID wird in InitIssueFetch() gesetzt, und ist der Index der zuletzt angefragen Fetchgruppe in der Liste dieser
    SetLength(buf, FetchGroups[ReplyGroupID].ByteCount);
    Move(aReplyBuffer[SizeOf(TFetchAnswerHeader)], buf[0], FetchGroups[ReplyGroupID].ByteCount);
    MakeDBEntriesForGroup(ReplyGroupID, buf);
  end;

  NewDataArrived := false;
  WaitingForReply := false;
end;

// Mein Sorgenkind! Das erstellen des TDBEntryThreads knallt sporadisch, und zwar beim Erzeugen selbt.
// Der Konstruktur läuft sauber durch, das hab ich getestet, dennoch gibt's ab und an eine AV an unterschiedlichen
// Adressen. Mal 0, mal $FFFFFFFF, mal irgendwas im Codesegment, und dort immer leicht verschiedene.
// Das try..except hier ist nur ein Workaround, den ich gern los wäre. Zudem greift dies nicht immer, selten springt
// Delphi auch gleich ins CPU-Fenster, obwohl "bei Delphi-Exceptions stoppen" aus ist. Igitt!
procedure TFetchThread.MakeDBEntriesForGroup(groupID: Integer; buf: TFetchEntryBuffer);
var
  group: TFetchGroup;
  threadOkay: Boolean;
begin
  group := FetchGroups[groupID];
  threadOkay := false;
  repeat
    try
      TDBEntryThread.Create(Qry.Connection, group, buf, SockForm.Handle);
      threadOkay := true;
    except
    end;
    Sleep(1);
  until threadOkay;
end;

// Das ist besagter Konstruktor. Ein paar private Felder werden gesetzt, viel mehr nicht.
constructor TDBEntryThread.Create(aCon: TUniConnection; aGroup: TFetchGroup; aBuf: TFetchEntryBuffer; aWnd: HWND);
begin
  inherited Create(true);
  Con := aCon;
  Sql := TUniSQL.Create(nil);
  Sql.Connection := Con;
  Group := aGroup;
  Buf := aBuf;
  Wnd := aWnd;
  FreeOnTerminate := true;
  Resume;
end;

// Und hier der zugehörige Workload
procedure TDBEntryThread.Execute;
begin
  // Hier wird anhand von Infos aus der "Group" der Empfagspuffer interpretiert. Group wird dabei nur lesend angefasst.
  // Da das recht viel ist, durch diesen Kommentar ersetzt. Im wesentlichen wird ein SQL Statement zusammengestückelt.

  while Sql.Connection.InTransaction do
    Sleep(1);
  Sql.Execute;

  // Puffer zur Änderungserkennung für diesen Fetch-Aufruf speichern (einziger schreibender Zugriff auf Group)
  group.Buffer := Buf;
  group.FirstRun := false;
end;

// Wird vom MainForm aufgerufen, wenn OnSocketRead die geforderte Datenmenge gelesen hat
procedure TFetchThread.SocketDataReady(aBuffer: TSockBuffer);
begin
  ReplyBuffer := aBuffer;
  NewDataArrived := true;
end;

// Das hier macht der ominöse Cyclethread: Er zählt in den Fetchgruppen einen Zeitwert runter,
// anhand dessen ermittelt wird welche Anfrage als nächste ansteht.
procedure TCycleThread.Execute;
var
  i: Integer;
  tickDelta: Int64;
begin
  repeat
    tickDelta := GetTickCount-LastTickCount;
    LastTickCount := GetTickCount;
    for i := 0 to Items.Count-1 do
      Items[i].CurrentCycleTime := Items[i].CurrentCycleTime - tickDelta;
    Sleep(1);
  until Terminated;
end;
Was mich fuchst ist, dass das Erzeugen des DBEntryThreads ab und an knallt (so im Schnitt alle 20 Mal ein Mal, die Zykluszeit ist bei 2 Sekunden), obwohl der Konstruktor selbst keine AV erzeugt (getestet via try..except drum rum und bei Exception Logfile machen, was nie geschah). Das try..Except um den Aufruf des Konstruktors dagegen läuft wie gesagt ab und an auf den Hammer, ich komme beim Debuggen nur nicht an die genaue fehlerhafte Zeile, da diese sich nicht explizit in meinem Code zu befinden scheint.
Ich hab mir ein wenig Sorgen um diese Groups gemacht, da diese ja ein Feld des FetchThreads sind, und sowohl vom Cyclethread als auch vom DBEntryThread dort hineingegriffen wird. Jedoch werden in beiden nur elementare Operationen ausgeführt, und die Struktur der Liste bleibt nach dem Konstruktor von TFetchThread für den Rest des Ablaufs immer gleich.
Was hab ich hier nicht bedacht? Wäre prima, wenn trotz der Länge des Teils jemandem was auffällt. Danke schon mal!

\\Edit:
Hier mal ein Auszug aus meinem Fehlerlog (das schreibe ich im except-Teil der Methode MakeDBEntriesForGroup(), das ist hier der Länge wegen nicht im Code):
Code:
25:07:2011 12:46:17:625 : Zugriffsverletzung bei Adresse 00000000. Lesen von Adresse 00000000 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:46:31:859 : Zugriffsverletzung bei Adresse 0040399A in Modul 'Server.exe'. Lesen von Adresse FFFFFFFF -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:47:00:953 : Zugriffsverletzung bei Adresse 0040399A in Modul 'Server.exe'. Lesen von Adresse FFFFFFFF -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:47:04:562 : Zugriffsverletzung bei Adresse 0040399A in Modul 'Server.exe'. Lesen von Adresse FFFFFFFF -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:47:21:609 : Zugriffsverletzung bei Adresse 00004244. Lesen von Adresse 00004244 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:48:00:421 : Zugriffsverletzung bei Adresse 0040399A in Modul 'Server.exe'. Lesen von Adresse FFFFFFFF -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:48:02:859 : Zugriffsverletzung bei Adresse 00004244. Lesen von Adresse 00004244 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:49:27:796 : Zugriffsverletzung bei Adresse 00000000. Lesen von Adresse 00000000 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:49:45:890 : Zugriffsverletzung bei Adresse 00000000. Lesen von Adresse 00000000 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:49:49:937 : Zugriffsverletzung bei Adresse 00000000. Lesen von Adresse 00000000 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:49:53:203 : Zugriffsverletzung bei Adresse 0040399A in Modul 'Server.exe'. Lesen von Adresse FFFFFFFF -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:49:53:984 : Zugriffsverletzung bei Adresse 0040399A in Modul 'Server.exe'. Lesen von Adresse FFFFFFFF -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:50:00:500 : Zugriffsverletzung bei Adresse 0040399A in Modul 'Server.exe'. Lesen von Adresse FFFFFFFF -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:50:06:125 : Zugriffsverletzung bei Adresse 00000000. Lesen von Adresse 00000000 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:50:07:796 : Zugriffsverletzung bei Adresse 0040399A in Modul 'Server.exe'. Lesen von Adresse FFFFFFFF -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:50:08:140 : Zugriffsverletzung bei Adresse 00000000. Lesen von Adresse 00000000 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:50:12:203 : Zugriffsverletzung bei Adresse 00000000. Lesen von Adresse 00000000 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:50:44:234 : Zugriffsverletzung bei Adresse 0040399A in Modul 'Server.exe'. Lesen von Adresse FFFFFFFF -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:50:52:703 : Zugriffsverletzung bei Adresse 0040399A in Modul 'Server.exe'. Lesen von Adresse FFFFFFFF -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:51:52:156 : Zugriffsverletzung bei Adresse 00000000. Lesen von Adresse 00000000 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:52:03:109 : Zugriffsverletzung bei Adresse 00000000. Lesen von Adresse 00000000 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:54:52:859 : Zugriffsverletzung bei Adresse 00004244. Lesen von Adresse 00004244 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:58:24:640 : Zugriffsverletzung bei Adresse 0040399A in Modul 'Server.exe'. Lesen von Adresse FFFFFFFF -> TDBEntryThread.Create(Qry.Connection, group, buf);
"When one person suffers from a delusion, it is called insanity. When a million people suffer from a delusion, it is called religion." (Richard Dawkins)

Geändert von Medium (25. Jul 2011 um 13:01 Uhr)
  Mit Zitat antworten Zitat