lazcollections: TLazThreadedQueue refactor into threaded/none-threaded part

git-svn-id: trunk@63861 -
This commit is contained in:
martin 2020-09-03 08:21:02 +00:00
parent 144fb11c00
commit c5e7f1e250

View File

@ -36,22 +36,45 @@ type
class property DefaultSpinCount: integer read GetDefaultSpinCount write SetDefaultSpinCount;
end;
{ TThreadedQueue }
{ TLazFifoQueue }
generic TLazFifoQueue<T> = class
private
FList: array of T;
FQueueSize: integer;
FTotalItemsPopped: QWord;
FTotalItemsPushed: QWord;
function GetIsEmpty: Boolean;
function GetIsFull: Boolean;
public
constructor create(AQueueDepth: Integer = 10);
procedure Grow(ADelta: integer);
function PushItem(const AItem: T): Boolean; virtual;
function PopItem(out AItem: T): Boolean; virtual;
property QueueSize: integer read FQueueSize;
property TotalItemsPopped: QWord read FTotalItemsPopped;
property TotalItemsPushed: QWord read FTotalItemsPushed;
property IsEmpty: Boolean read GetIsEmpty;
property IsFull: Boolean read GetIsFull;
end;
{ TLazThreadedQueue }
generic TLazThreadedQueue<T> = class
protected
type
TLazTypedFifoQueue = specialize TLazFifoQueue<T>;
private
FMonitor: TLazMonitor;
FList: array of T;
FFifoList: TLazTypedFifoQueue;
FPushTimeout: Cardinal;
FPopTimeout: Cardinal;
FQueueSize: integer;
FTotalItemsPopped: QWord;
FTotalItemsPushed: QWord;
FHasRoomEvent: PRTLEvent;
FHasItemEvent: PRTLEvent;
FShutDown: boolean;
function GetQueueSize: integer; inline;
function GetTotalItemsPopped: QWord; inline;
function GetTotalItemsPushed: QWord; inline;
function TryPushItem(const AItem: T): boolean; inline;
function TryPopItem(out AItem: T): boolean; inline;
protected
@ -59,6 +82,7 @@ type
function TryPopItemUnprotected(out AItem: T): boolean;
procedure Lock;
procedure Unlock;
function CreateFifoQueue(AQueueDepth: Integer): TLazTypedFifoQueue; virtual;
public
constructor create(AQueueDepth: Integer = 10; PushTimeout: cardinal = INFINITE; PopTimeout: cardinal = INFINITE);
destructor Destroy; override;
@ -67,9 +91,9 @@ type
function PopItem(out AItem: T): TWaitResult;
function PopItemTimeout(out AItem: T; Timeout: cardinal): TWaitResult;
procedure DoShutDown;
property QueueSize: integer read FQueueSize;
property TotalItemsPopped: QWord read FTotalItemsPopped;
property TotalItemsPushed: QWord read FTotalItemsPushed;
property QueueSize: integer read GetQueueSize;
property TotalItemsPopped: QWord read GetTotalItemsPopped;
property TotalItemsPushed: QWord read GetTotalItemsPushed;
property ShutDown: boolean read FShutDown;
end;
@ -150,6 +174,61 @@ begin
inherited Acquire;
end;
{ TLazFifoQueue }
function TLazFifoQueue.GetIsEmpty: Boolean;
begin
result := FTotalItemsPushed = FTotalItemsPopped;
end;
function TLazFifoQueue.GetIsFull: Boolean;
begin
result := FTotalItemsPushed - FTotalItemsPopped = FQueueSize;
end;
constructor TLazFifoQueue.create(AQueueDepth: Integer);
begin
Grow(AQueueDepth);
end;
procedure TLazFifoQueue.Grow(ADelta: integer);
var
NewList: array of T;
c: Integer;
i: QWord;
begin
c:=Max(FQueueSize + ADelta, Integer(FTotalItemsPushed - FTotalItemsPopped));
setlength(NewList, c);
i:=FTotalItemsPopped;
while i < FTotalItemsPushed do begin
NewList[i mod c] := FList[i mod FQueueSize];
inc(i);
end;
FList := NewList;
FQueueSize:=c;
end;
function TLazFifoQueue.PushItem(const AItem: T): Boolean;
begin
result := FTotalItemsPushed-FTotalItemsPopped<FQueueSize;
if result then
begin
FList[FTotalItemsPushed mod FQueueSize]:=AItem;
inc(FTotalItemsPushed);
end;
end;
function TLazFifoQueue.PopItem(out AItem: T): Boolean;
begin
result := FTotalItemsPushed>FTotalItemsPopped;
if result then
begin
AItem := FList[FTotalItemsPopped mod FQueueSize];
inc(FTotalItemsPopped);
end;
end;
{ TThreadedQueue }
function TLazThreadedQueue.TryPushItem(const AItem: T): boolean;
@ -162,6 +241,21 @@ begin
end;
end;
function TLazThreadedQueue.GetQueueSize: integer;
begin
Result := FFifoList.QueueSize;
end;
function TLazThreadedQueue.GetTotalItemsPopped: QWord;
begin
Result := FFifoList.TotalItemsPopped;
end;
function TLazThreadedQueue.GetTotalItemsPushed: QWord;
begin
Result := FFifoList.TotalItemsPushed;
end;
function TLazThreadedQueue.TryPopItem(out AItem: T): boolean;
begin
FMonitor.Enter;
@ -174,29 +268,23 @@ end;
function TLazThreadedQueue.TryPushItemUnprotected(const AItem: T): boolean;
begin
result := FTotalItemsPushed-FTotalItemsPopped<FQueueSize;
result := FFifoList.PushItem(AItem);
if result then
begin
FList[FTotalItemsPushed mod FQueueSize]:=AItem;
inc(FTotalItemsPushed);
RTLeventSetEvent(FHasItemEvent);
end;
RTLeventResetEvent(FHasRoomEvent);
if FTotalItemsPushed-FTotalItemsPopped<FQueueSize then
if not FFifoList.IsFull then
RTLeventSetEvent(FHasRoomEvent);
end;
function TLazThreadedQueue.TryPopItemUnprotected(out AItem: T): boolean;
begin
result := FTotalItemsPushed>FTotalItemsPopped;
result := FFifoList.PopItem(AItem);
if result then
begin
AItem := FList[FTotalItemsPopped mod FQueueSize];
inc(FTotalItemsPopped);
RTLeventSetEvent(FHasRoomEvent);
end;
RTLeventResetEvent(FHasItemEvent);
if FTotalItemsPushed > FTotalItemsPopped then
if not FFifoList.IsEmpty then
RTLeventSetEvent(FHasItemEvent);
end;
@ -210,9 +298,16 @@ begin
FMonitor.Leave;
end;
function TLazThreadedQueue.CreateFifoQueue(AQueueDepth: Integer
): TLazTypedFifoQueue;
begin
result := TLazTypedFifoQueue.create(AQueueDepth);
end;
constructor TLazThreadedQueue.create(AQueueDepth: Integer; PushTimeout: cardinal; PopTimeout: cardinal);
begin
FMonitor:=TLazMonitor.create;
FFifoList := CreateFifoQueue(AQueueDepth);
Grow(AQueueDepth);
FHasRoomEvent:=RTLEventCreate;
RTLeventSetEvent(FHasRoomEvent);
@ -227,6 +322,7 @@ begin
RTLeventdestroy(FHasRoomEvent);
RTLeventdestroy(FHasItemEvent);
FMonitor.Free;
FFifoList.Free;
inherited Destroy;
end;
@ -238,16 +334,7 @@ var
begin
FMonitor.Enter;
try
c:=Max(FQueueSize + ADelta, Integer(FTotalItemsPushed - FTotalItemsPopped));
setlength(NewList, c);
i:=FTotalItemsPopped;
while i < FTotalItemsPushed do begin
NewList[i mod c] := FList[i mod FQueueSize];
inc(i);
end;
FList := NewList;
FQueueSize:=c;
FFifoList.Grow(ADelta);
finally
FMonitor.Leave;
end;