* Rework packet handling

This commit is contained in:
Michaël Van Canneyt 2021-08-11 21:53:00 +02:00
parent 5221388659
commit 776790d014

View File

@ -362,8 +362,8 @@ type
TDataPacketFormat = (dfBinary,dfXML,dfXMLUTF8,dfAny,dfDefault); TDataPacketFormat = (dfBinary,dfXML,dfXMLUTF8,dfAny,dfDefault);
TDatapacketReaderClass = class of TDatapacketReader; TDataPacketHandlerClass = class of TDataPacketHandler;
TDataPacketReader = class(TObject) TDataPacketHandler = class(TObject)
FDataSet: TCustomBufDataset; FDataSet: TCustomBufDataset;
FStream : TStream; FStream : TStream;
protected protected
@ -398,6 +398,8 @@ type
// Checks if the provided stream is of the right format for this class // Checks if the provided stream is of the right format for this class
class function RecognizeStream(AStream : TStream) : boolean; virtual; abstract; class function RecognizeStream(AStream : TStream) : boolean; virtual; abstract;
end; end;
TDataPacketReaderClass = TDataPacketHandlerClass;
TDataPacketReader = TDataPacketHandler;
{ TFpcBinaryDatapacketReader } { TFpcBinaryDatapacketReader }
@ -419,7 +421,7 @@ type
null fields are not stored (see: null bitmap) null fields are not stored (see: null bitmap)
} }
TFpcBinaryDatapacketReader = class(TDataPacketReader) TFpcBinaryDatapacketHandler = class(TDataPacketHandler)
private private
const const
FpcBinaryIdent1 = 'BinBufDataset'; // Old version 1; support for transient period; FpcBinaryIdent1 = 'BinBufDataset'; // Old version 1; support for transient period;
@ -446,6 +448,7 @@ type
procedure FinalizeStoreRecords; override; procedure FinalizeStoreRecords; override;
class function RecognizeStream(AStream : TStream) : boolean; override; class function RecognizeStream(AStream : TStream) : boolean; override;
end; end;
TFpcBinaryDatapacketReader = TFpcBinaryDatapacketHandler;
{ TCustomBufDataset } { TCustomBufDataset }
@ -502,7 +505,7 @@ type
FFileName: TFileName; FFileName: TFileName;
FReadFromFile : boolean; FReadFromFile : boolean;
FFileStream : TFileStream; FFileStream : TFileStream;
FDatasetReader : TDataPacketReader; FPacketHandler : TDataPacketReader;
FMaxIndexesCount: integer; FMaxIndexesCount: integer;
FDefaultIndex, FDefaultIndex,
FCurrentIndexDef : TBufDatasetIndex; FCurrentIndexDef : TBufDatasetIndex;
@ -537,8 +540,8 @@ type
function GetFieldSize(FieldDef : TFieldDef) : longint; function GetFieldSize(FieldDef : TFieldDef) : longint;
procedure CalcRecordSize; procedure CalcRecordSize;
function IntAllocRecordBuffer: TRecordBuffer; function IntAllocRecordBuffer: TRecordBuffer;
procedure IntLoadFieldDefsFromFile; procedure IntLoadFieldDefsFromPacket(aReader : TDataPacketReader);
procedure IntLoadRecordsFromFile; procedure IntLoadRecordsFromPacket(aReader : TDataPacketReader);
function GetCurrentBuffer: TRecordBuffer; function GetCurrentBuffer: TRecordBuffer;
procedure CurrentRecordToBuffer(Buffer: TRecordBuffer); procedure CurrentRecordToBuffer(Buffer: TRecordBuffer);
function LoadBuffer(Buffer : TRecordBuffer): TGetResult; function LoadBuffer(Buffer : TRecordBuffer): TGetResult;
@ -1372,12 +1375,7 @@ end;
procedure TCustomBufDataset.InternalInitFieldDefs; procedure TCustomBufDataset.InternalInitFieldDefs;
begin begin
if FileName<>'' then // Do nothing
begin
IntLoadFieldDefsFromFile;
FreeAndNil(FDatasetReader);
FreeAndNil(FFileStream);
end;
end; end;
procedure TCustomBufDataset.InitUserIndexes; procedure TCustomBufDataset.InitUserIndexes;
@ -1393,63 +1391,84 @@ end;
procedure TCustomBufDataset.InternalOpen; procedure TCustomBufDataset.InternalOpen;
var IndexNr : integer; var
i : integer; IndexNr : integer;
i : integer;
aPacketReader : TDataPacketReader;
aStream : TFileStream;
begin begin
if assigned(FDatasetReader) or (FileName<>'') then aPacketReader:=Nil;
IntLoadFieldDefsFromFile; aStream:=Nil;
try
// This checks if the dataset is actually created (by calling CreateDataset, if assigned(FPacketHandler) or (FileName<>'') then
// or reading from a stream in some other way implemented by a descendent)
// If there are less fields than FieldDefs we know for sure that the dataset
// is not (correctly) created.
// If there are constant expressions in the select statement (for PostgreSQL)
// they are of type ftUnknown (in FieldDefs), and are not created (in Fields).
// So Fields.Count < FieldDefs.Count in this case
// See mantis #22030
// if Fields.Count<FieldDefs.Count then
if (Fields.Count = 0) or (FieldDefs.Count=0) then
DatabaseError(SErrNoDataset);
// search for autoinc field
FAutoIncField:=nil;
if FAutoIncValue>-1 then
begin
for i := 0 to Fields.Count-1 do
if Fields[i] is TAutoIncField then
begin begin
FAutoIncField := TAutoIncField(Fields[i]); aPacketReader:=FPacketHandler;
Break; if FileName<>'' then
begin
aStream := TFileStream.Create(FileName, fmOpenRead);
aPacketReader := GetPacketReader(dfDefault, aStream);
end;
IntLoadFieldDefsFromPacket(aPacketReader);
end; end;
// This checks if the dataset is actually created (by calling CreateDataset,
// or reading from a stream in some other way implemented by a descendent)
// If there are less fields than FieldDefs we know for sure that the dataset
// is not (correctly) created.
// If there are constant expressions in the select statement (for PostgreSQL)
// they are of type ftUnknown (in FieldDefs), and are not created (in Fields).
// So Fields.Count < FieldDefs.Count in this case
// See mantis #22030
// if Fields.Count<FieldDefs.Count then
if (Fields.Count = 0) or (FieldDefs.Count=0) then
DatabaseError(SErrNoDataset);
// search for autoinc field
FAutoIncField:=nil;
if FAutoIncValue>-1 then
begin
for i := 0 to Fields.Count-1 do
if Fields[i] is TAutoIncField then
begin
FAutoIncField := TAutoIncField(Fields[i]);
Break;
end;
end;
InitDefaultIndexes;
InitUserIndexes;
If FIndexName<>'' then
FCurrentIndexDef:=TBufDatasetIndex(FIndexes.Find(FIndexName))
else if (FIndexFieldNames<>'') then
BuildCustomIndex;
CalcRecordSize;
FBRecordCount := 0;
for IndexNr:=0 to FIndexes.Count-1 do
if Assigned(BufIndexdefs[IndexNr]) then
With BufIndexes[IndexNr] do
InitialiseSpareRecord(IntAllocRecordBuffer);
FAllPacketsFetched := False;
FOpen:=True;
// parse filter expression
ParseFilter(Filter);
if assigned(aPacketReader) then
IntLoadRecordsFromPacket(aPacketReader);
finally
// We created reader locally here.
if assigned(aStream) then
FreeAndNil(aPacketReader);
FreeAndNil(aStream);
end; end;
InitDefaultIndexes;
InitUserIndexes;
If FIndexName<>'' then
FCurrentIndexDef:=TBufDatasetIndex(FIndexes.Find(FIndexName))
else if (FIndexFieldNames<>'') then
BuildCustomIndex;
CalcRecordSize;
FBRecordCount := 0;
for IndexNr:=0 to FIndexes.Count-1 do
if Assigned(BufIndexdefs[IndexNr]) then
With BufIndexes[IndexNr] do
InitialiseSpareRecord(IntAllocRecordBuffer);
FAllPacketsFetched := False;
FOpen:=True;
// parse filter expression
ParseFilter(Filter);
if assigned(FDatasetReader) then IntLoadRecordsFromFile;
end; end;
procedure TCustomBufDataset.DoBeforeClose; procedure TCustomBufDataset.DoBeforeClose;
@ -2307,7 +2326,7 @@ end;
class function TCustomBufDataset.DefaultPacketClass: TDataPacketReaderClass; class function TCustomBufDataset.DefaultPacketClass: TDataPacketReaderClass;
begin begin
Result:=TFpcBinaryDatapacketReader; Result:=TFpcBinaryDatapacketHandler;
end; end;
function TCustomBufDataset.CreateDefaultPacketReader(aStream : TStream): TDataPacketReader; function TCustomBufDataset.CreateDefaultPacketReader(aStream : TStream): TDataPacketReader;
@ -3204,10 +3223,10 @@ begin
APacketReader := CreateDefaultPacketReader(AStream) APacketReader := CreateDefaultPacketReader(AStream)
else if GetRegisterDatapacketReader(AStream, fmt, APacketReaderReg) then else if GetRegisterDatapacketReader(AStream, fmt, APacketReaderReg) then
APacketReader := APacketReaderReg.ReaderClass.Create(Self, AStream) APacketReader := APacketReaderReg.ReaderClass.Create(Self, AStream)
else if TFpcBinaryDatapacketReader.RecognizeStream(AStream) then else if TFpcBinaryDatapacketHandler.RecognizeStream(AStream) then
begin begin
AStream.Seek(0, soFromBeginning); AStream.Seek(0, soFromBeginning);
APacketReader := TFpcBinaryDatapacketReader.Create(Self, AStream) APacketReader := TFpcBinaryDatapacketHandler.Create(Self, AStream)
end end
else else
DatabaseError(SStreamNotRecognised,Self); DatabaseError(SStreamNotRecognised,Self);
@ -3451,11 +3470,11 @@ end;
procedure TCustomBufDataset.SetDatasetPacket(AReader: TDataPacketReader); procedure TCustomBufDataset.SetDatasetPacket(AReader: TDataPacketReader);
begin begin
FDatasetReader := AReader; FPacketHandler := AReader;
try try
Open; Open;
finally finally
FDatasetReader := nil; FPacketHandler := nil;
end; end;
end; end;
@ -3487,7 +3506,7 @@ procedure TCustomBufDataset.GetDatasetPacket(AWriter: TDataPacketReader);
FFilterBuffer:=AUpdBuffer.OldValuesBuffer; FFilterBuffer:=AUpdBuffer.OldValuesBuffer;
// OldValuesBuffer is nil if the record is either inserted or inserted and then deleted // OldValuesBuffer is nil if the record is either inserted or inserted and then deleted
if assigned(FFilterBuffer) then if assigned(FFilterBuffer) then
FDatasetReader.StoreRecord(AThisRowState,FCurrentUpdateBuffer); aWriter.StoreRecord(AThisRowState,FCurrentUpdateBuffer);
end; end;
procedure HandleUpdateBuffersFromRecord(AFindNext : boolean; ARecBookmark : TBufBookmark; var ARowState: TRowState); procedure HandleUpdateBuffersFromRecord(AFindNext : boolean; ARecBookmark : TBufBookmark; var ARowState: TRowState);
@ -3520,13 +3539,11 @@ var ScrollResult : TGetResult;
RowState : TRowState; RowState : TRowState;
begin begin
FDatasetReader := AWriter; // CheckActive;
ABookMark:=@ATBookmark;
aWriter.StoreFieldDefs(FAutoIncValue);
SavedState:=SetTempState(dsFilter);
try try
// CheckActive;
ABookMark:=@ATBookmark;
FDatasetReader.StoreFieldDefs(FAutoIncValue);
SavedState:=SetTempState(dsFilter);
ScrollResult:=CurrentIndexBuf.ScrollFirst; ScrollResult:=CurrentIndexBuf.ScrollFirst;
while ScrollResult=grOK do while ScrollResult=grOK do
begin begin
@ -3537,9 +3554,9 @@ begin
// now store current record // now store current record
FFilterBuffer:=CurrentIndexBuf.CurrentBuffer; FFilterBuffer:=CurrentIndexBuf.CurrentBuffer;
if RowState=[] then if RowState=[] then
FDatasetReader.StoreRecord([]) aWriter.StoreRecord([])
else else
FDatasetReader.StoreRecord(RowState,FCurrentUpdateBuffer); aWriter.StoreRecord(RowState,FCurrentUpdateBuffer);
ScrollResult:=CurrentIndexBuf.ScrollForward; ScrollResult:=CurrentIndexBuf.ScrollForward;
if ScrollResult<>grOK then if ScrollResult<>grOK then
@ -3551,12 +3568,9 @@ begin
// There could be an update buffer linked to the last (spare) record // There could be an update buffer linked to the last (spare) record
CurrentIndexBuf.StoreSpareRecIntoBookmark(ABookmark); CurrentIndexBuf.StoreSpareRecIntoBookmark(ABookmark);
HandleUpdateBuffersFromRecord(False,ABookmark^,RowState); HandleUpdateBuffersFromRecord(False,ABookmark^,RowState);
aWriter.FinalizeStoreRecords;
RestoreState(SavedState);
FDatasetReader.FinalizeStoreRecords;
finally finally
FDatasetReader := nil; RestoreState(SavedState);
end; end;
end; end;
@ -3586,7 +3600,7 @@ begin
else if GetRegisterDatapacketReader(Nil,fmt,APacketReaderReg) then else if GetRegisterDatapacketReader(Nil,fmt,APacketReaderReg) then
APacketWriter := APacketReaderReg.ReaderClass.Create(Self, AStream) APacketWriter := APacketReaderReg.ReaderClass.Create(Self, AStream)
else if fmt = dfBinary then else if fmt = dfBinary then
APacketWriter := TFpcBinaryDatapacketReader.Create(Self, AStream) APacketWriter := TFpcBinaryDatapacketHandler.Create(Self, AStream)
else else
DatabaseError(SNoReaderClassRegistered,Self); DatabaseError(SNoReaderClassRegistered,Self);
try try
@ -3685,25 +3699,19 @@ begin
Result := -1; Result := -1;
end; end;
procedure TCustomBufDataset.IntLoadFieldDefsFromFile; procedure TCustomBufDataset.IntLoadFieldDefsFromPacket(aReader : TDataPacketReader);
begin begin
FReadFromFile := True; FReadFromFile := True;
if not assigned(FDatasetReader) then
begin
FFileStream := TFileStream.Create(FileName, fmOpenRead);
FDatasetReader := GetPacketReader(dfDefault, FFileStream);
end;
FieldDefs.Clear; FieldDefs.Clear;
FDatasetReader.LoadFieldDefs(FAutoIncValue); aReader.LoadFieldDefs(FAutoIncValue);
if DefaultFields then if DefaultFields then
CreateFields CreateFields
else else
BindFields(true); BindFields(true);
end; end;
procedure TCustomBufDataset.IntLoadRecordsFromFile; procedure TCustomBufDataset.IntLoadRecordsFromPacket(aReader : TDataPacketReader);
var var
SavedState : TDataSetState; SavedState : TDataSetState;
@ -3715,12 +3723,12 @@ var
begin begin
CheckBiDirectional; CheckBiDirectional;
DefIdx:=DefaultBufferIndex; DefIdx:=DefaultBufferIndex;
FDatasetReader.InitLoadRecords; aReader.InitLoadRecords;
SavedState:=SetTempState(dsFilter); SavedState:=SetTempState(dsFilter);
while FDatasetReader.GetCurrentRecord do while aReader.GetCurrentRecord do
begin begin
ARowState := FDatasetReader.GetRecordRowState(AUpdOrder); ARowState := aReader.GetRecordRowState(AUpdOrder);
if rsvOriginal in ARowState then if rsvOriginal in ARowState then
begin begin
if length(FUpdateBuffer) < (AUpdOrder+1) then if length(FUpdateBuffer) < (AUpdOrder+1) then
@ -3731,12 +3739,12 @@ begin
FFilterBuffer:=IntAllocRecordBuffer; FFilterBuffer:=IntAllocRecordBuffer;
fillchar(FFilterBuffer^,FNullmaskSize,0); fillchar(FFilterBuffer^,FNullmaskSize,0);
FUpdateBuffer[FCurrentUpdateBuffer].OldValuesBuffer := FFilterBuffer; FUpdateBuffer[FCurrentUpdateBuffer].OldValuesBuffer := FFilterBuffer;
FDatasetReader.RestoreRecord; aReader.RestoreRecord;
FDatasetReader.GotoNextRecord; aReader.GotoNextRecord;
if not FDatasetReader.GetCurrentRecord then if not aReader.GetCurrentRecord then
DatabaseError(SStreamNotRecognised,Self); DatabaseError(SStreamNotRecognised,Self);
ARowState := FDatasetReader.GetRecordRowState(AUpdOrder); ARowState := aReader.GetRecordRowState(AUpdOrder);
if rsvUpdated in ARowState then if rsvUpdated in ARowState then
FUpdateBuffer[FCurrentUpdateBuffer].UpdateKind:= ukModify FUpdateBuffer[FCurrentUpdateBuffer].UpdateKind:= ukModify
else else
@ -3746,7 +3754,7 @@ begin
DefIdx.StoreSpareRecIntoBookmark(@FUpdateBuffer[FCurrentUpdateBuffer].BookmarkData); DefIdx.StoreSpareRecIntoBookmark(@FUpdateBuffer[FCurrentUpdateBuffer].BookmarkData);
fillchar(FFilterBuffer^,FNullmaskSize,0); fillchar(FFilterBuffer^,FNullmaskSize,0);
FDatasetReader.RestoreRecord; aReader.RestoreRecord;
DefIdx.AddRecord; DefIdx.AddRecord;
inc(FBRecordCount); inc(FBRecordCount);
end end
@ -3761,7 +3769,7 @@ begin
fillchar(FFilterBuffer^,FNullmaskSize,0); fillchar(FFilterBuffer^,FNullmaskSize,0);
FUpdateBuffer[FCurrentUpdateBuffer].OldValuesBuffer := FFilterBuffer; FUpdateBuffer[FCurrentUpdateBuffer].OldValuesBuffer := FFilterBuffer;
FDatasetReader.RestoreRecord; aReader.RestoreRecord;
FUpdateBuffer[FCurrentUpdateBuffer].UpdateKind:= ukDelete; FUpdateBuffer[FCurrentUpdateBuffer].UpdateKind:= ukDelete;
DefIdx.StoreSpareRecIntoBookmark(@FUpdateBuffer[FCurrentUpdateBuffer].BookmarkData); DefIdx.StoreSpareRecIntoBookmark(@FUpdateBuffer[FCurrentUpdateBuffer].BookmarkData);
@ -3777,7 +3785,7 @@ begin
begin begin
FFilterBuffer:=DefIdx.SpareBuffer; FFilterBuffer:=DefIdx.SpareBuffer;
fillchar(FFilterBuffer^,FNullmaskSize,0); fillchar(FFilterBuffer^,FNullmaskSize,0);
FDatasetReader.RestoreRecord; aReader.RestoreRecord;
if rsvInserted in ARowState then if rsvInserted in ARowState then
begin begin
if length(FUpdateBuffer) < (AUpdOrder+1) then if length(FUpdateBuffer) < (AUpdOrder+1) then
@ -3791,17 +3799,12 @@ begin
inc(FBRecordCount); inc(FBRecordCount);
end; end;
FDatasetReader.GotoNextRecord; aReader.GotoNextRecord;
end; end;
RestoreState(SavedState); RestoreState(SavedState);
DefIdx.SetToFirstRecord; DefIdx.SetToFirstRecord;
FAllPacketsFetched:=True; FAllPacketsFetched:=True;
if assigned(FFileStream) then
begin
FreeAndNil(FFileStream);
FreeAndNil(FDatasetReader);
end;
// rebuild indexes // rebuild indexes
BuildIndexes; BuildIndexes;
@ -3899,7 +3902,7 @@ end;
function TCustomBufDataset.IsReadFromPacket: Boolean; function TCustomBufDataset.IsReadFromPacket: Boolean;
begin begin
Result := (FDatasetReader<>nil) or (FFileName<>'') or FReadFromFile; Result := (FPacketHandler<>nil) or (FFileName<>'') or FReadFromFile;
end; end;
procedure TCustomBufDataset.ParseFilter(const AFilter: string); procedure TCustomBufDataset.ParseFilter(const AFilter: string);
@ -4320,15 +4323,15 @@ begin
end; end;
{ TFpcBinaryDatapacketReader } { TFpcBinaryDatapacketHandler }
constructor TFpcBinaryDatapacketReader.Create(ADataSet: TCustomBufDataset; AStream: TStream); constructor TFpcBinaryDatapacketHandler.Create(ADataSet: TCustomBufDataset; AStream: TStream);
begin begin
inherited; inherited;
FVersion := 20; // default version 2.0 FVersion := 20; // default version 2.0
end; end;
procedure TFpcBinaryDatapacketReader.LoadFieldDefs(var AnAutoIncValue: integer); procedure TFpcBinaryDatapacketHandler.LoadFieldDefs(var AnAutoIncValue: integer);
var FldCount : word; var FldCount : word;
i : integer; i : integer;
@ -4367,7 +4370,7 @@ begin
SetLength(FNullBitmap, FNullBitmapSize); SetLength(FNullBitmap, FNullBitmapSize);
end; end;
procedure TFpcBinaryDatapacketReader.StoreFieldDefs(AnAutoIncValue: integer); procedure TFpcBinaryDatapacketHandler.StoreFieldDefs(AnAutoIncValue: integer);
var i : integer; var i : integer;
begin begin
Stream.Write(FpcBinaryIdent2[1], length(FpcBinaryIdent2)); Stream.Write(FpcBinaryIdent2[1], length(FpcBinaryIdent2));
@ -4393,18 +4396,18 @@ begin
SetLength(FNullBitmap, FNullBitmapSize); SetLength(FNullBitmap, FNullBitmapSize);
end; end;
procedure TFpcBinaryDatapacketReader.InitLoadRecords; procedure TFpcBinaryDatapacketHandler.InitLoadRecords;
begin begin
// Do nothing // Do nothing
end; end;
function TFpcBinaryDatapacketReader.GetCurrentRecord: boolean; function TFpcBinaryDatapacketHandler.GetCurrentRecord: boolean;
var Buf : byte; var Buf : byte;
begin begin
Result := (Stream.Read(Buf,1)=1) and (Buf=$fe); Result := (Stream.Read(Buf,1)=1) and (Buf=$fe);
end; end;
function TFpcBinaryDatapacketReader.GetRecordRowState(out AUpdOrder : Integer) : TRowState; function TFpcBinaryDatapacketHandler.GetRecordRowState(out AUpdOrder : Integer) : TRowState;
var Buf : byte; var Buf : byte;
begin begin
Stream.Read(Buf,1); Stream.Read(Buf,1);
@ -4415,12 +4418,12 @@ begin
AUpdOrder := 0; AUpdOrder := 0;
end; end;
procedure TFpcBinaryDatapacketReader.GotoNextRecord; procedure TFpcBinaryDatapacketHandler.GotoNextRecord;
begin begin
// Do Nothing // Do Nothing
end; end;
procedure TFpcBinaryDatapacketReader.RestoreRecord; procedure TFpcBinaryDatapacketHandler.RestoreRecord;
var var
AField: TField; AField: TField;
i: integer; i: integer;
@ -4463,7 +4466,7 @@ begin
end; end;
end; end;
procedure TFpcBinaryDatapacketReader.StoreRecord(ARowState: TRowState; AUpdOrder : integer); procedure TFpcBinaryDatapacketHandler.StoreRecord(ARowState: TRowState; AUpdOrder : integer);
var var
AField: TField; AField: TField;
i: integer; i: integer;
@ -4513,12 +4516,12 @@ begin
end; end;
end; end;
procedure TFpcBinaryDatapacketReader.FinalizeStoreRecords; procedure TFpcBinaryDatapacketHandler.FinalizeStoreRecords;
begin begin
// Do nothing // Do nothing
end; end;
class function TFpcBinaryDatapacketReader.RecognizeStream(AStream: TStream): boolean; class function TFpcBinaryDatapacketHandler.RecognizeStream(AStream: TStream): boolean;
var s : string; var s : string;
begin begin
SetLength(s, 13); SetLength(s, 13);