Registriert seit: 23. Jan 2008
3.686 Beiträge
Delphi 2007 Enterprise
|
Die lieben Threads mal wieder, es Fehlert so rum
25. Jul 2011, 12:33
Aloah!
So ganz so grün bin ich in Sachen Multithreading ja eigentlich nicht - dachte ich - aber in meinem aktuellen Projekt hüpft mir gelegentlich dann doch die gelegentliche AV ins Gesicht.
Der Sinn des Teils ist: Es gibt eine Liste von Anfragen, die via TCP/ IP an ein anderes System gesendet werden. Da die Antworten dieses Systems keine Zuordnung zur gemachten Anfrage zulassen, muss auf die Antwort gewartet werden, bevor eine nächste geschickt werden kann. Die Anfragen werden in einem definierten Zyklus verschickt, so dass ich das in einen Thread ausgelagert habe, der immer wieder diese Anfragen abschickt wenn sie anstehen, und zurück gelieferte Antworten verarbeitet (=Daten in meine DB schreibt).
Ich versuche mal die grobe Struktur darzustellen:
Delphi-Quellcode:
unit MyThreads;
interface
type
TFetchThread = class(TThread)
private
FetchGroups: TFetchGroupList; // Eine List mit Instanzen von TFetchGroup, die sind nichts großartig spannendes. Reine Datenhalter.
Sock: TClientSocket;
Con: TUniConnection;
Qry: TUniQuery;
CycleThread: TCycleThread;
NewDataArrived: Boolean;
WaitingForReply: Boolean;
SockForm: TForm;
procedure IssueFetch(fetchGroupIndex: Integer);
procedure HandleFetchReply(aReplyBuffer: TSockBuffer);
procedure MakeDBEntriesForGroup(groupID: Integer; buf: TFetchEntryBuffer);
procedure InitIssueFetch;
protected
procedure Execute; override;
public
constructor Create(aSocket: TClientSocket; aSocketForm: TForm; aDBConnection: TUniConnection);
destructor Destroy; override;
procedure SocketDataReady(aBuffer: TSockBuffer);
end;
implementation
constructor TFetchThread.Create(aSocket: TClientSocket; aSocketForm: TForm; aDBConnection: TUniConnection);
var
i, m: Integer;
begin
inherited Create(true);
Sock := aSocket; // Socket kommt vom MainForm
SockForm := aSocketForm; // Das Handle brauche ich später auch
// Eigene Connection für den Thread erstellen, Daten von der übergebenen Connection nehmen
try
Con := TUniConnection.Create( nil);
Con.ProviderName := aDBConnection.ProviderName;
Con.Server := aDBConnection.Server;
Con.Database := aDBConnection.Database;
Con.Port := aDBConnection.Port;
Con.Username := aDBConnection.Username;
Con.Password := aDBConnection.Password;
Con.LoginPrompt := false;
Con.Connect;
except
Con.Free;
raise Exception.Create(' Mäh');
end;
// Thread-eigene Query
try
Qry := TUniQuery.Create( nil);
Qry.Connection := Con;
except
Qry.Free;
raise Exception.Create(' Muh');
end;
// Gruppen füllen, dies ist die einzige Stelle, an der an diesen strukturell etwas gemacht wird
FetchGroups := TFetchFunctions.MakeFetchGroups(Qry);
CycleThread := TCycleThread.Create;
CycleThread.Items := FetchGroups;
CycleThread.Resume;
self.Resume;
end;
// WaitingForReply heisst, es wurde eine Anfrage gesendet, die Antwort steht noch aus. In der Zeit darf nichts gemacht werden.
// NewDataArrived wird im SocketRead gesetzt, wenn die erwartete Menge Bytes angekommen und valide ist.
procedure TFetchThread.Execute;
var
i, k: Integer;
lowCycle: Integer;
begin
repeat
if not WaitingForReply then
begin
// Gruppe suchen, die aktualisiert werden muss (die, die am längsten überfällig ist)
lowCycle := 0;
k := -1;
for i := 0 to FetchGroups.Count-1 do
begin
if FetchGroups[i].CurrentCycleTime < lowCycle then
begin
lowCycle := FetchGroups[i].CurrentCycleTime;
k := i;
end;
end;
// Wurde eine gefunden, dann via Socket-Kompo auf dem MainForm die Anfrage senden
if (k >= 0) then
begin
IssueFetchID := k;
Synchronize(InitIssueFetch);
end;
end
else
begin
// Wird eine Antwort zu einer Anfrage erwartet, und es ist eine eingetroffen, diese nun verarbeiten.
if NewDataArrived then
HandleFetchReply(ReplyBuffer); // Der Buffer ist ein array[0..8191] of Byte, mehr kommt definitiv nicht
end;
Sleep(1);
until Terminated;
end;
procedure TFetchThread.HandleFetchReply(aReplyBuffer: TSockBuffer);
var
h: TFetchAnswerHeader; // ein record
isValid: Boolean;
buf: TFetchEntryBuffer; // array of Byte
begin
Move(aReplyBuffer[0], h, SizeOf(TFetchAnswerHeader));
// Header auf Validität prüfen
isValid := DiverseDinge;
if isValid then
begin
// ReplyGroupID wird in InitIssueFetch() gesetzt, und ist der Index der zuletzt angefragen Fetchgruppe in der Liste dieser
SetLength(buf, FetchGroups[ReplyGroupID].ByteCount);
Move(aReplyBuffer[SizeOf(TFetchAnswerHeader)], buf[0], FetchGroups[ReplyGroupID].ByteCount);
MakeDBEntriesForGroup(ReplyGroupID, buf);
end;
NewDataArrived := false;
WaitingForReply := false;
end;
// Mein Sorgenkind! Das erstellen des TDBEntryThreads knallt sporadisch, und zwar beim Erzeugen selbt.
// Der Konstruktur läuft sauber durch, das hab ich getestet, dennoch gibt's ab und an eine AV an unterschiedlichen
// Adressen. Mal 0, mal $FFFFFFFF, mal irgendwas im Codesegment, und dort immer leicht verschiedene.
// Das try..except hier ist nur ein Workaround, den ich gern los wäre. Zudem greift dies nicht immer, selten springt
// Delphi auch gleich ins CPU-Fenster, obwohl "bei Delphi-Exceptions stoppen" aus ist. Igitt!
procedure TFetchThread.MakeDBEntriesForGroup(groupID: Integer; buf: TFetchEntryBuffer);
var
group: TFetchGroup;
threadOkay: Boolean;
begin
group := FetchGroups[groupID];
threadOkay := false;
repeat
try
TDBEntryThread.Create(Qry.Connection, group, buf, SockForm.Handle);
threadOkay := true;
except
end;
Sleep(1);
until threadOkay;
end;
// Das ist besagter Konstruktor. Ein paar private Felder werden gesetzt, viel mehr nicht.
constructor TDBEntryThread.Create(aCon: TUniConnection; aGroup: TFetchGroup; aBuf: TFetchEntryBuffer; aWnd: HWND);
begin
inherited Create(true);
Con := aCon;
Sql := TUniSQL.Create( nil);
Sql.Connection := Con;
Group := aGroup;
Buf := aBuf;
Wnd := aWnd;
FreeOnTerminate := true;
Resume;
end;
// Und hier der zugehörige Workload
procedure TDBEntryThread.Execute;
begin
// Hier wird anhand von Infos aus der "Group" der Empfagspuffer interpretiert. Group wird dabei nur lesend angefasst.
// Da das recht viel ist, durch diesen Kommentar ersetzt. Im wesentlichen wird ein SQL Statement zusammengestückelt.
while Sql.Connection.InTransaction do
Sleep(1);
Sql.Execute;
// Puffer zur Änderungserkennung für diesen Fetch-Aufruf speichern (einziger schreibender Zugriff auf Group)
group.Buffer := Buf;
group.FirstRun := false;
end;
// Wird vom MainForm aufgerufen, wenn OnSocketRead die geforderte Datenmenge gelesen hat
procedure TFetchThread.SocketDataReady(aBuffer: TSockBuffer);
begin
ReplyBuffer := aBuffer;
NewDataArrived := true;
end;
// Das hier macht der ominöse Cyclethread: Er zählt in den Fetchgruppen einen Zeitwert runter,
// anhand dessen ermittelt wird welche Anfrage als nächste ansteht.
procedure TCycleThread.Execute;
var
i: Integer;
tickDelta: Int64;
begin
repeat
tickDelta := GetTickCount-LastTickCount;
LastTickCount := GetTickCount;
for i := 0 to Items.Count-1 do
Items[i].CurrentCycleTime := Items[i].CurrentCycleTime - tickDelta;
Sleep(1);
until Terminated;
end;
Was mich fuchst ist, dass das Erzeugen des DBEntryThreads ab und an knallt (so im Schnitt alle 20 Mal ein Mal, die Zykluszeit ist bei 2 Sekunden), obwohl der Konstruktor selbst keine AV erzeugt (getestet via try..except drum rum und bei Exception Logfile machen, was nie geschah). Das try..Except um den Aufruf des Konstruktors dagegen läuft wie gesagt ab und an auf den Hammer, ich komme beim Debuggen nur nicht an die genaue fehlerhafte Zeile, da diese sich nicht explizit in meinem Code zu befinden scheint.
Ich hab mir ein wenig Sorgen um diese Groups gemacht, da diese ja ein Feld des FetchThreads sind, und sowohl vom Cyclethread als auch vom DBEntryThread dort hineingegriffen wird. Jedoch werden in beiden nur elementare Operationen ausgeführt, und die Struktur der Liste bleibt nach dem Konstruktor von TFetchThread für den Rest des Ablaufs immer gleich.
Was hab ich hier nicht bedacht? Wäre prima, wenn trotz der Länge des Teils jemandem was auffällt. Danke schon mal!
\\Edit:
Hier mal ein Auszug aus meinem Fehlerlog (das schreibe ich im except-Teil der Methode MakeDBEntriesForGroup(), das ist hier der Länge wegen nicht im Code):
Code:
25:07:2011 12:46:17:625 : Zugriffsverletzung bei Adresse 00000000. Lesen von Adresse 00000000 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:46:31:859 : Zugriffsverletzung bei Adresse 0040399A in Modul 'Server.exe'. Lesen von Adresse FFFFFFFF -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:47:00:953 : Zugriffsverletzung bei Adresse 0040399A in Modul 'Server.exe'. Lesen von Adresse FFFFFFFF -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:47:04:562 : Zugriffsverletzung bei Adresse 0040399A in Modul 'Server.exe'. Lesen von Adresse FFFFFFFF -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:47:21:609 : Zugriffsverletzung bei Adresse 00004244. Lesen von Adresse 00004244 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:48:00:421 : Zugriffsverletzung bei Adresse 0040399A in Modul 'Server.exe'. Lesen von Adresse FFFFFFFF -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:48:02:859 : Zugriffsverletzung bei Adresse 00004244. Lesen von Adresse 00004244 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:49:27:796 : Zugriffsverletzung bei Adresse 00000000. Lesen von Adresse 00000000 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:49:45:890 : Zugriffsverletzung bei Adresse 00000000. Lesen von Adresse 00000000 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:49:49:937 : Zugriffsverletzung bei Adresse 00000000. Lesen von Adresse 00000000 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:49:53:203 : Zugriffsverletzung bei Adresse 0040399A in Modul 'Server.exe'. Lesen von Adresse FFFFFFFF -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:49:53:984 : Zugriffsverletzung bei Adresse 0040399A in Modul 'Server.exe'. Lesen von Adresse FFFFFFFF -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:50:00:500 : Zugriffsverletzung bei Adresse 0040399A in Modul 'Server.exe'. Lesen von Adresse FFFFFFFF -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:50:06:125 : Zugriffsverletzung bei Adresse 00000000. Lesen von Adresse 00000000 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:50:07:796 : Zugriffsverletzung bei Adresse 0040399A in Modul 'Server.exe'. Lesen von Adresse FFFFFFFF -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:50:08:140 : Zugriffsverletzung bei Adresse 00000000. Lesen von Adresse 00000000 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:50:12:203 : Zugriffsverletzung bei Adresse 00000000. Lesen von Adresse 00000000 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:50:44:234 : Zugriffsverletzung bei Adresse 0040399A in Modul 'Server.exe'. Lesen von Adresse FFFFFFFF -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:50:52:703 : Zugriffsverletzung bei Adresse 0040399A in Modul 'Server.exe'. Lesen von Adresse FFFFFFFF -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:51:52:156 : Zugriffsverletzung bei Adresse 00000000. Lesen von Adresse 00000000 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:52:03:109 : Zugriffsverletzung bei Adresse 00000000. Lesen von Adresse 00000000 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:54:52:859 : Zugriffsverletzung bei Adresse 00004244. Lesen von Adresse 00004244 -> TDBEntryThread.Create(Qry.Connection, group, buf);
25:07:2011 12:58:24:640 : Zugriffsverletzung bei Adresse 0040399A in Modul 'Server.exe'. Lesen von Adresse FFFFFFFF -> TDBEntryThread.Create(Qry.Connection, group, buf);
"When one person suffers from a delusion, it is called insanity. When a million people suffer from a delusion, it is called religion." (Richard Dawkins)
Geändert von Medium (25. Jul 2011 um 13:01 Uhr)
|