lazarus-ccr/components/flashfiler/sourcelaz/ffsrsort.pas
2016-12-07 13:31:59 +00:00

729 lines
24 KiB
ObjectPascal

{*********************************************************}
{* FlashFiler: Sort Engine classes *}
{*********************************************************}
(* ***** BEGIN LICENSE BLOCK *****
* Version: MPL 1.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is TurboPower FlashFiler
*
* The Initial Developer of the Original Code is
* TurboPower Software
*
* Portions created by the Initial Developer are Copyright (C) 1996-2002
* the Initial Developer. All Rights Reserved.
*
* Contributor(s):
*
* ***** END LICENSE BLOCK ***** *)
{$I ffdefine.inc}
unit ffsrsort;
interface
uses
ffllbase,
fflldict,
ffsrcur,
ffsreng,
ffsrixhl;
const
ffcl_MergeSortBufferSize : Longint = ffcl_1MB;
ffcl_MergeOrder = 5;
{ # of merge files from which we are retrieving records at any one time. }
type
TffSrSortEngineClass = class of TffSrBaseSortEngine;
TffSrSortState = (ffspEmpty, ffspPutting, ffspGetting);
{ The following record type holds the information pertinent to each field
on which the records are being sorted. }
TffSrSortFieldInfo = packed record
fldDescriptor : TffFieldDescriptor;
fldInxHelper : TffSrIndexHelper;
fldLength : Longint;
fldNoCase : boolean;
fldOrderDir : TffOrderByDirection;
end;
PffSrSortArray = ^TffSrSortArray;
TffSrSortArray = array[0..ffcl_MaxIndexFlds] of TffSrSortFieldInfo;
PffSortBuffer = ^TffSortBuffer;
TffSortBuffer = array[0..ffcl_1MB] of byte;
TffSrBaseSortEngine = class(TffObject)
protected
bsDB : TffSrDatabase;
{-The database containing the sorted table. }
bsDict : TffDataDictionary;
{-The dictionary describing the sorted table. }
bsEngine : TffServerEngine;
{-The engine managing the database & table. }
bsNumFields : integer;
{-Number of fields on which sort is taking place. }
bsRecLen : Longint;
{-The length of each record. }
bsSortInfo : PffSrSortArray;
{-The set of sorting information required by the sorting engine. }
public
constructor Create(anEngine : TffServerEngine;
aDB : TffSrDatabase;
aFieldsArray : TffFieldList;
const aOrderByArray : TffOrderByArray;
const aNumFields : integer;
aDict : TffDataDictionary;
const aIndexID : integer); virtual;
{ Note: Creator is responsible for freeing the memory associated with
aFieldsArray & aOrderByArray. }
destructor Destroy; override;
function Get(aRecord : PffByteArray) : boolean; virtual; abstract;
function Put(aRecord : PffByteArray) : TffResult; virtual; abstract;
end;
{ This class performs a merge sort on a set of records. As methods are fed
to the engine via the Put method, this class places the records within a
buffer. When the buffer is full, the engine quick sorts the records and
writes them to a temporary cursor.
The merge sort does not occur until the Get method is first called.
When the Get method is first used to retrieve the sorted records, the
engine sorts the current buffer of records. It then ... }
TffSrMergeSortEngine = class(TffSrBaseSortEngine)
protected
msBuffer : PffSortBuffer;
{-The run buffer used to cache unsorted records during Put phase. }
msBufferOffset : Longint;
{-The current offset into the buffer. }
msCursorList : TffSrCursorList;
{-List of the cursor's containing the sorted buffers. }
msCursorOnRec : array[0..pred(ffcl_MergeOrder)] of boolean;
{-Each element in this array has a one-to-one correspondence with the
elements in msMergeCursor. If an element in this array is set to True
then the corresponding cursor is positioned on a record that is to be
used for comparison. If the element is set to False then the cursor
is not set on a record to be used for comparison. }
msMaxRecCount : Longint;
{-The maximum number of records that may be held in the buffer. }
msMergeCursor : array[0..pred(ffcl_MergeOrder)] of TffSrSimpleCursor;
{-Array of cursors used for merging. }
msMergeCursorCount : integer;
{-The number of cursors involved in one stage of merging. }
msOutputCursor : TffSrSimpleCursor;
{-Cursor to which merged records are written. }
msOutputStoreSize : TffWord32;
{-The calculated size for the output cursor's temporary storage file.
Calculated in msSetMergeCursors. }
msPivotBuffer : PffByteArray;
{-Holds the pivot element during the quick sort. }
msRecBuffer : PffByteArray;
{-Used to temporarily hold a record while it is being swapped with
another record. }
msRecCount : Longint;
{-The number of records currently in msBuffer. }
msRunIndex : Longint;
{-When a small number of records (i.e., fewer than can be stored in the
run buffer) are added to the engine, this variable serves as an index
into the run buffer during the Get phase. We retrieve the sorted
records from the run buffer instead of using any merge files. }
msState : TffSrSortState;
{-The state of the sort engine. }
msTotalCount : TffWord32;
{-The total number of records added to the sort engine. }
{ Protected methods }
function msCompRecs(PRec1, PRec2 : PffBytearray) : integer;
{-Used to compare two records. }
function msFinalizeBuffer(const WriteToCursor : boolean) : TffResult;
{-Called when the in-memory run buffer is ready to be sorted and written
to a temporary cursor. If the run buffer is to be written to a
temporary cursor, set WriteToCursor to True. }
procedure msGetNextRecord(aRecord : PffByteArray);
{-Finds the next record that should be written to the
output cursor. It pulls the records from a number of input cursors
that are being merged. }
procedure msMerge;
{-Merges all the temporary cursors until there are ffcl_MergeOrder or
fewer temporary cursors left. }
procedure msMergeCursors;
{-Used to merge a number of cursors into an output cursor. }
procedure msNextRecord(const aCursorIndex : integer);
{-Positions a specific merge cursor to its next record. If the merge
cursor reaches EOF then this routine closes the cursor and adjusts
the msMergeCursor array. }
procedure msSetMergeCursors;
{-Determines which cursors are to be used for merging. }
procedure msSortBuffer;
{-Uses non-recursive quick sort algorithm to sort the in-memory record
buffer. The quick sort algorithm calculates the Median Of Three method
to calculate the pivot element. }
procedure msSwapRecs(Rec1, Rec2 : Longint);
{-Used to swap two records within the in-memory buffer. }
public
constructor Create(anEngine : TffServerEngine;
aDB : TffSrDatabase;
aFieldsArray : TffFieldList;
const aOrderByArray : TffOrderByArray;
const aNumFields : integer;
aDict : TffDataDictionary;
const aIndexID : integer); override;
{ Note: Creator is responsible for freeing the memory associated with
aIndexHelperArray & aOrderByArray. }
destructor Destroy; override;
function Get(aRecord : PffByteArray) : boolean; override;
function Put(aRecord : PffByteArray) : TffResult; override;
end;
var
ffcSortEngineClass : TffSrSortEngineClass = TffSrMergeSortEngine;
{ The type of sort engine to be used by the server engine. }
implementation
uses
sysutils,
ffllexcp,
ffsrbase,
ffsrbde,
ffsrlock;
{===TffSrBaseSortEngine==============================================}
constructor TffSrBaseSortEngine.Create(anEngine : TffServerEngine;
aDB : TffSrDatabase;
aFieldsArray : TffFieldList;
const aOrderByArray : TffOrderByArray;
const aNumFields : integer;
aDict : TffDataDictionary;
const aIndexID : integer);
var
FldInx, Index : integer;
begin
inherited Create;
bsDB := aDB;
bsDict := aDict;
bsEngine := anEngine;
bsNumFields := aNumFields;
bsRecLen := aDict.RecordLength;
{ Build the set of sorting information. }
FFGetMem(bsSortInfo, SizeOf(TffSrSortFieldInfo) * bsNumFields);
for Index := 0 to pred(aNumFields) do begin
FldInx := aFieldsArray[Index];
with bsSortInfo^[Index] do begin
fldDescriptor := aDict.FieldDescriptor[FldInx]^;
fldInxHelper := aDict.IndexHelpers[aIndexID, Index];
fldLength := aDict.FieldLength[FldInx];
fldNoCase := aDict.IndexIsCaseInsensitive[aIndexID];
fldOrderDir := aOrderByArray[Index];
end;
end;
end;
{--------}
destructor TffSrBaseSortEngine.destroy;
begin
if assigned(bsSortInfo) then
FFFreeMem(bsSortInfo, SizeOf(TffSrSortFieldInfo) * bsNumFields);
inherited Destroy;
end;
{====================================================================}
{===TffSrMergeSortEngine=============================================}
constructor TffSrMergeSortEngine.Create(anEngine : TffServerEngine;
aDB : TffSrDatabase;
aFieldsArray : TffFieldList;
const aOrderByArray : TffOrderByArray;
const aNumFields : integer;
aDict : TffDataDictionary;
const aIndexID : integer);
begin
inherited Create(anEngine, aDB, aFieldsArray, aOrderByArray, aNumFields,
aDict, aIndexID);
FFGetMem(msBuffer, ffcl_MergeSortBufferSize);
FfGetMem(msPivotBuffer, bsRecLen);
FFGetMem(msRecBuffer, bsRecLen);
msBufferOffset := 0;
msCursorList := TffSrCursorList.Create;
msMaxRecCount := ffcl_MergeSortBufferSize div bsRecLen;
msOutputStoreSize := ffcl_MergeSortBufferSize * ffcl_MergeOrder;
{ Default value. Not really used. }
msRecCount := 0;
msState := ffspEmpty;
msTotalCount := 0;
end;
{--------}
destructor TffSrMergeSortEngine.Destroy;
var
aCursor : TffSrBaseCursor;
Index : Longint;
begin
if assigned(msBuffer) then
FFFreeMem(msBuffer, ffcl_MergeSortBufferSize);
if assigned(msPivotBuffer) then
FFFreeMem(msPivotBuffer, bsRecLen);
if assigned(msRecBuffer) then
FFFreeMem(msRecBuffer, bsRecLen);
if assigned(msCursorList) then begin
for Index := pred(msCursorList.CursorCount) downto 0 do begin
aCursor := msCursorList.Cursor[ftFromIndex, Index];
msCursorList.DeleteCursor(aCursor.CursorID);
end;
msCursorList.Free;
end;
inherited;
end;
{--------}
function TffSrMergeSortEngine.Get(aRecord : PffByteArray) : boolean;
var
aStatus : TffResult;
begin
Result := false;
{ Is this the first get? }
if msState <> ffspGetting then begin
{ Yes. }
msState := ffspGetting;
{ Any records in the run buffer? }
if msRecCount > 0 then begin
{ Yes. Sort the run buffer. Write to temp cursor only if we have
written other temporary cursors. This is a performance optimization.
If there aren't enough records to fill a run buffer then we will
just quick sort them and retrieve them from the run buffer. }
aStatus := msFinalizeBuffer(msCursorList.CursorCount > 0);
if aStatus <> DBIERR_NONE then
FFRaiseException(EffException, ffStrResServer, aStatus,
['TffSrMergeSortEngine.Get']);
{ Do we have some temporary cursors? }
if msCursorList.CursorCount > 0 then
{ Yes. Merge them until they are whittled down to ffcl_MergeOrder files
in number. }
msMerge
else
{ No. But we do have records in the run buffer. Init an index into the
run buffer. }
msRunIndex := 0;
end
{ Any records at all? }
else if msTotalCount = 0 then
{ No. Nothing to sort. }
Exit;
end;
{ Get next record from merge files? }
if msMergeCursorCount > 0 then begin
{ Yes. }
msGetNextRecord(aRecord);
Result := True;
end
else if msRunIndex < msRecCount then begin
{ No. Not enough records for a merge file. Retrieve the next record from
the run buffer. }
Move(msBuffer^[msRunIndex * bsRecLen], aRecord^, bsRecLen);
inc(msRunIndex);
Result := True;
end;
end;
{--------}
function TffSrMergeSortEngine.msCompRecs(PRec1, PRec2 : PffByteArray) : integer;
var
Fld1Null, Fld2Null : boolean;
Index : integer;
Offset : Longint;
SortInfo : TffSrSortFieldInfo;
begin
Result := 0;
Index := 0;
{ Compare each field until we see a non-zero result. }
while (Result = 0) and (Index < bsNumFields) do begin
SortInfo := bsSortInfo^[Index];
{ Is either field a null? }
Fld1Null := bsDict.IsRecordFieldNull(SortInfo.fldDescriptor.fdNumber, PRec1);
Fld2Null := bsDict.IsRecordFieldNull(SortInfo.fldDescriptor.fdNumber, PRec2);
if Fld1Null then begin
if Fld2Null then
Result := 0
else
Result := -1;
end
else if Fld2Null then
Result := 1
else begin
Offset := bsSortInfo^[Index].fldDescriptor.fdOffset;
Result := bsSortInfo^[Index].fldInxHelper.CompareKey
(PRec1^[Offset], PRec2^[Offset],
@bsSortInfo^[Index].fldDescriptor,
bsSortInfo^[Index].fldLength, bsSortInfo^[Index].fldNoCase);
end;
{ The compare function always compares in ascending fashion. If this is
to be ordered in descending fashion and our result is non-zero, flip
some bits. }
if bsSortInfo^[Index].fldOrderDir = ffobDescending then
Result := -Result;
inc(Index);
end;
end;
{--------}
function TffSrMergeSortEngine.msFinalizeBuffer(const WriteToCursor : boolean) : TffResult;
var
Cursor : TffSrSimpleCursor;
Index : Longint;
begin
Result := DBIERR_NONE;
{ Sort the buffer. }
msSortBuffer;
if WriteToCursor then begin
{ Write the records to a temporary file. }
Cursor := TffSrSimpleCursor.Create(bsEngine, bsDB, FFGetRemainingTime);
Cursor.Build('', bsDict, omReadWrite, smExclusive, false, true,
[fffaTemporary, fffaBLOBChainSafe], ffcl_MergeSortBufferSize); {!!.05}
Cursor.CloseTable := True;
for Index := 0 to pred(msRecCount) do begin
Result := Cursor.InsertRecord(@msBuffer^[Index * bsRecLen], ffsltNone);
if Result <> DBIERR_NONE then
Exit;
end;
{ Add this cursor to our list of temporary files. }
msCursorList.AddCursor(Cursor);
{ Zero out the buffer. }
FillChar(msBuffer^, ffcl_MergeSortBufferSize, 0);
msRecCount := 0;
end;
end;
{--------}
procedure TffSrMergeSortEngine.msGetNextRecord(aRecord : PffByteArray);
var
aResult : TffResult;
Index, Index2 : integer;
begin
{ Assumption: Each cursor in the merge is positioned on a record. If a cursor
reaches EOF then it is closed before we access it again. }
{ Get record for first cursor. }
aResult := msMergeCursor[0].GetRecord(aRecord, ffsltNone);
Index := 0;
{ Did an error occur? }
if (aResult <> DBIERR_NONE) then
{ Yes. Raise an exception. }
FFRaiseException(EffException, ffStrResServer, aResult,
['msGetNextRecord.1']);
{ Compare the records from the other cursors. }
for Index2 := 1 to pred(msMergeCursorCount) do begin
aResult := msMergeCursor[Index2].GetRecord(msRecBuffer, ffsltNone);
{ Did an error occur? }
if (aResult <> DBIERR_NONE) then
{ Yes. Raise an exception. }
FFRaiseException(EffException, ffStrResServer, aResult,
['msGetNextRecord.2']);
{ Should this record be before the current record? }
if msCompRecs(msRecBuffer, aRecord) < 0 then begin
{ Yes. Copy the record. }
Move(msRecBuffer^, aRecord^, bsRecLen);
Index := Index2;
end;
end;
{ By this point, we have found the next record. Move the cursor from which
the record was obtained to its next record. Note that this action may
result in the closing of the cursor. }
msNextRecord(Index);
end;
{--------}
procedure TffSrMergeSortEngine.msMerge;
begin
{ While we have more cursors to merge than the merge order, do some work. }
while msCursorList.CursorCount > ffcl_MergeOrder do begin
{ Get some cursors to merge. }
msSetMergeCursors;
{ Create an output cursor & add it to the cursor list. The records from
the merged cursors will go to the output cursor. }
msOutputCursor := TffSrSimpleCursor.Create(bsEngine, bsDB,
FFGetRemainingTime);
msOutputCursor.Build('', bsDict, omReadWrite, smExclusive,
false, true, [fffaTemporary, fffaBLOBChainSafe], {!!.05}
msOutputStoreSize); {!!.05}
msOutputCursor.CloseTable := True;
msCursorList.AddCursor(msOutputCursor);
{ Merge the input cursors into the output cursor. }
msMergeCursors;
end;
msSetMergeCursors;
end;
{--------}
procedure TffSrMergeSortEngine.msMergeCursors;
var
aRecord : PffByteArray;
aStatus : TffResult;
aStr : string;
begin
FFGetMem(aRecord, bsRecLen);
try
try
while True do begin
{ Find next record for output cursor. Did we find a record? }
msGetNextRecord(aRecord);
{ Send to output cursor. }
aStatus := msOutputCursor.InsertRecord(aRecord, ffsltNone);
if aStatus <> DBIERR_NONE then
FFRaiseException(EffException, ffStrResServer, aStatus,
['msMergeCursors']);
{ All records merged? }
if msMergeCursorCount = 0 then
break;
end;
except
on E:Exception do begin
aStr := E.message;
end;
end;
finally
FFFreeMem(aRecord, bsRecLen);
end;
end;
{--------}
procedure TffSrMergeSortEngine.msNextRecord(const aCursorIndex : integer);
var
aResult : TffResult;
begin
aResult := msMergeCursor[aCursorIndex].GetNextRecord(msRecBuffer, ffsltNone);
if aResult = DBIERR_EOF then begin
{ Close the cursor. }
msCursorList.DeleteCursor(msMergeCursor[aCursorIndex].CursorID);
{ Move the last cursor to this position. }
msMergeCursor[aCursorIndex] := msMergeCursor[pred(msMergeCursorCount)];
dec(msMergeCursorCount);
end;
end;
{--------}
procedure TffSrMergeSortEngine.msSetMergeCursors;
var
aCount : Longint;
aCursor : TffSrSimpleCursor;
RecsPerBlock : Longint;
begin
msMergeCursorCount := 0;
msOutputStoreSize := 0;
RecsPerBlock := (64 * 1024) div bsRecLen;
{ Obtain a merge cursor while we have not exceeded the merge order and
while we have not exceeded the number of temporary cursors. }
while (msMergeCursorCount < ffcl_MergeOrder) and
(msMergeCursorCount < msCursorList.CursorCount) do begin
inc(msMergeCursorCount);
aCursor := TffSrSimpleCursor(msCursorList.Cursor[ftFromIndex,
pred(msMergeCursorCount)]);
{ Position to first record in each cursor. }
aCursor.SetToBegin;
aCursor.GetNextRecord(msRecBuffer, ffsltNone);
msMergeCursor[pred(msMergeCursorCount)] := aCursor;
msCursorOnRec[pred(msMergeCursorCount)] := false;
aCursor.GetRecordCount(aCount);
{ Increment temp store size by # blocks needed to hold the data plus 2
blocks for header and data dictionary. }
inc(msOutputStoreSize, (((aCount div RecsPerBlock) + 1) * 64 * 1024) +
(2 * 64 * 1024));
end;
end;
{--------}
procedure TffSrMergeSortEngine.msSortBuffer;
const
MedianThreshold = 16;
StackSize = 32;
type
Stack = array[0..StackSize - 1] of Longint;
var
L : Longint; { The left edge, base zero. }
R : Longint; { The right edge, base zero. }
Pl : Longint; { Left edge within current partition, base zero. }
Pr : Longint; { Right edge within current partition, base zero. }
Pm : Longint; { Mid-point of current partition. }
PLen : Longint; { The size of the current partition. }
StackP : integer; { Stack pointer. }
LStack : Stack; { Pending partitions, left edge. }
RStack : Stack; { Pending partitions, right edge. }
begin
{ Initialize the stack. }
StackP := 0;
LStack[0] := 0;
RStack[0] := msRecCount - 1;
{ Repeatedly take top partition from the stack. }
repeat
{ Pop the stack. }
L := LStack[StackP];
R := RStack[StackP];
Dec(StackP);
{ Sort the current partition. }
repeat
Pl := L;
Pr := R;
PLen := Pr - Pl + 1;
{ Calculate the pivot element. }
Pm := Pl + (PLen shr 1);
if PLen >= MedianThreshold then begin
{ Sort elements P1, Pm, & Pr. }
if msCompRecs(@msBuffer^[Pm * bsRecLen], @msBuffer^[Pl * bsRecLen]) < 0 then
msSwapRecs(Pm, Pl);
if msCompRecs(@msBuffer^[Pr * bsRecLen], @msBuffer^[Pl * bsRecLen]) < 0 then
msSwapRecs(Pr, Pl);
if msCompRecs(@msBuffer^[Pr * bsRecLen], @msBuffer^[Pm * bsRecLen]) < 0 then
msSwapRecs(Pr, Pm);
{ Exchange Pm with Pr - 1 but use Pm's value as the pivot. }
msSwapRecs(Pm, Pr - 1);
Pm := Pr - 1;
{ Reduce range of swapping now that Pl and Pr are in the right
spots. }
inc(Pl);
dec(Pr, 2);
end;
{ Save the pivot element. }
Move(msBuffer^[Pm * bsRecLen], msPivotBuffer^, bsRecLen);
{ Swap items in sort order around the pivot. }
repeat
while msCompRecs(@msBuffer^[Pl * bsRecLen], msPivotBuffer) < 0 do
inc(Pl);
while msCompRecs(msPivotBuffer, @msBuffer^[Pr * bsRecLen]) < 0 do
dec(Pr);
{ Have we reached the pivot? }
if Pl = Pr then begin
Inc(Pl);
Dec(Pr);
end
else if Pl < Pr then begin
{ No. Swap elements around the pivot. }
msSwapRecs(Pl, Pr);
inc(Pl);
dec(Pr);
end;
until Pl > Pr;
{ Decide which partition to sort next. Which partition is bigger? }
if (Pr - L) < (R - Pl) then begin
{ Left partition is bigger. }
if Pl < R then begin
{ Stack the request for sorting right partition. }
inc(StackP);
LStack[StackP] := Pl;
RStack[StackP] := R;
end;
{ Continue sorting left partion. }
R := Pr;
end
else begin
{ Right partition is bigger. }
if L < Pr then begin
{ Stack the request for sorting left partition. }
inc(StackP);
LStack[StackP] := L;
RStack[StackP] := Pr;
end;
{ Continue sorting right partition. }
L := Pl;
end;
until L >= R;
until StackP < 0;
end;
{--------}
procedure TffSrMergeSortEngine.msSwapRecs(Rec1, Rec2 : Longint);
begin
Move(msBuffer^[Rec1 * bsRecLen], msRecBuffer^, bsRecLen);
Move(msBuffer^[Rec2 * bsRecLen], msBuffer^[Rec1 * bsRecLen], bsRecLen);
Move(msRecBuffer^, msBuffer^[Rec2 * bsRecLen], bsRecLen);
end;
{--------}
function TffSrMergeSortEngine.Put(aRecord : PffByteArray) : TffResult;
begin
Result := DBIERR_NONE;
{ Did we start retrieving? }
Assert(not (msState = ffspGetting));
msState := ffspPutting;
{ Is the buffer full? }
if msRecCount = msMaxRecCount then begin
Result := msFinalizeBuffer(True);
msBufferOffset := 0;
end;
if Result = DBIERR_NONE then begin
{ Add the record to the buffer. }
Move(aRecord^, msBuffer^[msBufferOffset], bsRecLen);
inc(msBufferOffset, bsRecLen);
inc(msRecCount);
inc(msTotalCount);
end;
end;
{====================================================================}
end.