FpDebug: extend/rework TFpThreadWorkerQueue

git-svn-id: trunk@63828 -
This commit is contained in:
martin 2020-08-25 17:31:51 +00:00
parent 005fe7208d
commit c9916553dd

View File

@ -54,23 +54,36 @@ type
TFpThreadWorkerItem = class
private const
TWSTATE_NEW = cardinal(0);
TWSTATE_RUNNING = cardinal(1);
TWSTATE_WAITING = cardinal(2);
TWSTATE_DONE = cardinal(3);
TWSTATE_NEW = cardinal(0);
TWSTATE_RUNNING = cardinal(1);
TWSTATE_WAITING = cardinal(2);
TWSTATE_WAIT_WORKER = cardinal(3);
TWSTATE_DONE = cardinal(4);
TWSTATE_CANCEL = cardinal(5);
private
FDone: Cardinal;
FState: Cardinal;
FError: Exception;
FRefCnt: LongInt;
FStopRequested: Boolean;
function GetIsCancelled: Boolean;
function GetIsDone: Boolean;
protected
procedure DoExecute; virtual;
procedure DoFinished; virtual;
procedure Execute(MyWorkerThread: TFpWorkerThread); // called by worker thread
procedure WaitFor(AnMainWaitEvent: PRTLEvent); // called by main thread => calls DoExecute, if needed
procedure Cancel(AnMainWaitEvent: PRTLEvent); // called by main thread => calls DoExecute, if needed
procedure ExecuteInThread(MyWorkerThread: TFpWorkerThread); // called by worker thread
procedure WaitForFinish(AnMainWaitEvent: PRTLEvent; AWaitForExecInThread: Boolean); // called by main thread => calls DoExecute, if needed
procedure WaitForCancel(AnMainWaitEvent: PRTLEvent); // called by main thread => calls DoExecute, if needed
public
procedure Execute; // Exec in main thread / Only if NOT queued
procedure AddRef;
procedure DecRef;
function RefCount: Integer;
procedure RequestStop;
property Error: Exception read FError;
property IsDone: Boolean read GetIsDone;
property IsCancelled: Boolean read GetIsCancelled;
property StopRequested: Boolean read FStopRequested; // Can be checeked by the worker / optional
end;
{ TFpWorkerThread }
@ -95,25 +108,29 @@ type
FWorkerThreadList: TFpWorkerThreadList;
FMainWaitEvent: PRTLEvent;
function GetCurrentCount: Integer;
function GetIdleThreadCount: integer;
function GetThreadCount: integer;
function GetWantedCount: Integer;
procedure SetThreadCount(AValue: integer);
protected
FIdleThreadCount: integer;
function RemoveThread(Item: TFpWorkerThread): Integer;
property WantedCount: Integer read GetWantedCount;
property CurrentCount: Integer read GetCurrentCount;
property ThreadMonitor: TLazMonitor read FThreadMonitor;
public
constructor Create(AQueueDepth: Integer = 10; PopTimeout: cardinal = INFINITE);
constructor Create(AQueueDepth: Integer = 10; PushTimeout: cardinal = INFINITE; PopTimeout: cardinal = INFINITE);
destructor Destroy; override; // Will not wait for the threads.
procedure Clear; // Not thread safe // remove all none running items
function PushItem(const AItem: TFpThreadWorkerItem): TWaitResult;
procedure PushItem(const AItem: TFpThreadWorkerItem);
procedure PushItemIdleOrRun(const AItem: TFpThreadWorkerItem);
procedure WaitForItem(const AItem: TFpThreadWorkerItem); // called by main thread => calls DoExecute, if needed
procedure RemoveItem(const AItem: TFpThreadWorkerItem); // wait but do not execute
procedure WaitForItem(const AItem: TFpThreadWorkerItem; AWaitForExecInThread: Boolean = False); // called by main thread => calls DoExecute, if needed
procedure RemoveItem(const AItem: TFpThreadWorkerItem); // wait if already running
property ThreadCount: integer read GetThreadCount write SetThreadCount; // Not thread safe
property IdleThreadCount: integer read GetIdleThreadCount;
property MainWaitEvent: PRTLEvent read FMainWaitEvent;
end;
@ -135,7 +152,7 @@ var
{$ifdef cpui386}
GMode: TFPDMode = dm32 deprecated;
{$else}
GMode: TFPDMode = dm64 deprecated;
GMode: TFPDMode = dm64; // deprecated;
{$endif}
function CompareUtf8BothCase(AnUpper, AnLower, AnUnknown: PChar): Boolean;
@ -374,6 +391,16 @@ end;
{ TFpThreadWorkerItem }
function TFpThreadWorkerItem.GetIsDone: Boolean;
begin
Result := InterLockedExchangeAdd(FState, 0) = TWSTATE_DONE;
end;
function TFpThreadWorkerItem.GetIsCancelled: Boolean;
begin
Result := InterLockedExchangeAdd(FState, 0) = TWSTATE_CANCEL;
end;
procedure TFpThreadWorkerItem.DoExecute;
begin
//
@ -385,44 +412,93 @@ begin
Destroy;
end;
procedure TFpThreadWorkerItem.Execute(MyWorkerThread: TFpWorkerThread);
procedure TFpThreadWorkerItem.ExecuteInThread(MyWorkerThread: TFpWorkerThread);
var
st: Cardinal;
OldState: Cardinal;
begin
st := InterLockedExchange(FDone, TWSTATE_RUNNING);
if st = TWSTATE_NEW then begin
DoExecute;
OldState := InterlockedCompareExchange(FState, TWSTATE_RUNNING, TWSTATE_NEW);
st := InterLockedExchange(FDone, TWSTATE_DONE);
if st = TWSTATE_WAITING then
RTLeventSetEvent(MyWorkerThread.Queue.MainWaitEvent);
if (OldState in [TWSTATE_NEW, TWSTATE_WAIT_WORKER]) then begin
(* State is now either TWSTATE_RUNNING or TWSTATE_WAIT_WORKER *)
if not StopRequested then
DoExecute;
OldState := InterLockedExchange(FState, TWSTATE_DONE);
if (OldState in [TWSTATE_WAITING, TWSTATE_WAIT_WORKER, TWSTATE_CANCEL]) then
RTLeventSetEvent(MyWorkerThread.Queue.MainWaitEvent)
else
// If other threads have a ref, they may call WaitForFinish and read data from this.
if (InterLockedExchangeAdd(FRefCnt, 0) > 1) then
WriteBarrier;
end;
end;
procedure TFpThreadWorkerItem.WaitFor(AnMainWaitEvent: PRTLEvent);
procedure TFpThreadWorkerItem.WaitForFinish(AnMainWaitEvent: PRTLEvent;
AWaitForExecInThread: Boolean);
var
st: Cardinal;
OldState: Cardinal;
begin
st := InterLockedExchange(FDone, TWSTATE_WAITING);
if st = TWSTATE_NEW then begin
DoExecute;
if AWaitForExecInThread then begin
// TWSTATE_NEW : mark TWSTATE_WAIT_WORKER, wait
// TWSTATE_RUNNING : mark TWSTATE_WAIT_WORKER, wait
// TWSTATE_WAITING : impossible
// TWSTATE_WAIT_WORKER : impossible
// TWSTATE_DONE : KEEP (will be restored at exit)
// TWSTATE_CANCEL : not allowed
OldState := InterLockedExchange(FState, TWSTATE_WAIT_WORKER);
assert(not (OldState in [TWSTATE_WAITING, TWSTATE_WAIT_WORKER, TWSTATE_CANCEL]), 'TFpThreadWorkerItem.WaitForFinish: not (OldState in [TWSTATE_WAITING, TWSTATE_WAIT_WORKER, TWSTATE_CANCEL])');
if (OldState in [TWSTATE_NEW, TWSTATE_RUNNING]) then begin
RTLeventWaitFor(AnMainWaitEvent);
RTLeventResetEvent(AnMainWaitEvent);
end
else
ReadBarrier;
end
else
if st = TWSTATE_RUNNING then begin
begin
OldState := InterLockedExchange(FState, TWSTATE_WAITING);
assert(not (OldState in [TWSTATE_WAITING, TWSTATE_WAIT_WORKER, TWSTATE_CANCEL]), 'TFpThreadWorkerItem.WaitForFinish: not (OldState in [TWSTATE_WAITING, TWSTATE_WAIT_WORKER, TWSTATE_CANCEL])');
if OldState = TWSTATE_NEW then begin
DoExecute;
end
else
if OldState = TWSTATE_RUNNING then begin
RTLeventWaitFor(AnMainWaitEvent);
RTLeventResetEvent(AnMainWaitEvent);
end
else
ReadBarrier;
end;
FState := TWSTATE_DONE; // No interlocked: The worker thread is done, so only the main thread is accessing this now
end;
procedure TFpThreadWorkerItem.WaitForCancel(AnMainWaitEvent: PRTLEvent);
var
OldState: Cardinal;
begin
// TWSTATE_NEW : mark TWSTATE_CANCEL
// TWSTATE_RUNNING : mark TWSTATE_CANCEL, wait
// TWSTATE_WAITING : impossible
// TWSTATE_WAIT_WORKER : impossible
// TWSTATE_DONE : KEEP (will be restored at exit)
// TWSTATE_CANCEL : KEEP
RequestStop;
OldState := InterLockedExchange(FState, TWSTATE_CANCEL); // Prevent thread form executing this
assert(not (OldState in [TWSTATE_WAITING, TWSTATE_WAIT_WORKER]), 'TFpThreadWorkerItem.WaitForCancel: not (OldState in [TWSTATE_WAITING, TWSTATE_WAIT_WORKER])');
if OldState = TWSTATE_RUNNING then begin
RTLeventWaitFor(AnMainWaitEvent);
RTLeventResetEvent(AnMainWaitEvent);
end
else
if OldState = TWSTATE_DONE then begin
FState := TWSTATE_DONE;
end;
end;
procedure TFpThreadWorkerItem.Cancel(AnMainWaitEvent: PRTLEvent);
var
st: Cardinal;
procedure TFpThreadWorkerItem.Execute;
begin
st := InterLockedExchange(FDone, TWSTATE_WAITING); // Prevent thread form executing this
if st = TWSTATE_RUNNING then begin
RTLeventWaitFor(AnMainWaitEvent);
RTLeventResetEvent(AnMainWaitEvent);
end;
DoExecute;
FState := TWSTATE_DONE;
end;
procedure TFpThreadWorkerItem.AddRef;
@ -438,6 +514,17 @@ begin
DoFinished;
end;
function TFpThreadWorkerItem.RefCount: Integer;
begin
Result := InterLockedExchangeAdd(FRefCnt, 0);
end;
procedure TFpThreadWorkerItem.RequestStop;
begin
FStopRequested := True;
InterlockedCompareExchange(FState, TWSTATE_CANCEL, TWSTATE_NEW); // if not running, then WaitForcancel
end;
{ TFpWorkerThread }
constructor TFpWorkerThread.Create(AQueue: TFpThreadWorkerQueue);
@ -450,12 +537,22 @@ end;
procedure TFpWorkerThread.Execute;
var
WorkItem: TFpThreadWorkerItem;
IsMarkedIdle: Boolean;
begin
IsMarkedIdle := False;
while not Terminated do begin
if (FQueue.PopItem(WorkItem) <> wrSignaled) or
if (FQueue.PopItemTimeout(WorkItem, 0) <> wrSignaled) or
(WorkItem = nil)
then
Continue;
then begin
if not IsMarkedIdle then begin
InterLockedIncrement(FQueue.FIdleThreadCount);
IsMarkedIdle := True;
end;
if (FQueue.PopItem(WorkItem) <> wrSignaled) or
(WorkItem = nil)
then
Continue;
end;
if WorkItem is TFpThreadWorkerTerminateItem then begin
WorkItem.DecRef;
@ -474,9 +571,25 @@ begin
Continue;
end;
WorkItem.Execute(Self);
WorkItem.DecRef;
if IsMarkedIdle then begin
InterLockedDecrement(FQueue.FIdleThreadCount);
IsMarkedIdle := False;
end;
try
WorkItem.ExecuteInThread(Self);
except
on E: Exception do
WorkItem.FError := E;
end;
try
WorkItem.DecRef;
except
on E: Exception do
debugln('Exception in WorkItem.DecRef: %s', [E.Message]);
end;
end;
if IsMarkedIdle then
InterLockedDecrement(FQueue.FIdleThreadCount);
FQueue.RemoveThread(Self);
end;
@ -497,6 +610,11 @@ begin
Result := InterLockedExchangeAdd(FCurrentCount, 0);
end;
function TFpThreadWorkerQueue.GetIdleThreadCount: integer;
begin
Result := InterLockedExchangeAdd(FIdleThreadCount, 0);
end;
function TFpThreadWorkerQueue.GetWantedCount: Integer;
begin
Result := InterLockedExchangeAdd(FWantedCount, 0);
@ -547,10 +665,10 @@ begin
end;
constructor TFpThreadWorkerQueue.Create(AQueueDepth: Integer;
PopTimeout: cardinal);
PushTimeout: cardinal; PopTimeout: cardinal);
begin
FThreadMonitor:=TLazMonitor.create;
inherited create(AQueueDepth, 0, PopTimeout);
inherited create(AQueueDepth, PushTimeout, PopTimeout);
FMainWaitEvent := RTLEventCreate;
FWorkerThreadList := TFpWorkerThreadList.Create(False);
end;
@ -559,21 +677,31 @@ destructor TFpThreadWorkerQueue.Destroy;
var
WorkItem: TFpThreadWorkerItem;
i: Integer;
mt: Boolean;
begin
Lock;
FThreadMonitor.Enter;
try
for i := 0 to FWorkerThreadList.Count - 1 do
FWorkerThreadList[i].Terminate; // also signals that the queue is no longer valid
while TryPopItemUnprotected(WorkItem) do begin
WorkItem.RequestStop;
WorkItem.DecRef;
end;
finally
FThreadMonitor.Leave;
Unlock;
end;
Clear;
ThreadCount := 0;
// Wait for threads.
mt := MainThreadID = ThreadID;
while CurrentCount > 0 do begin
sleep(1);
if mt then
CheckSynchronize(1);
if TotalItemsPushed = TotalItemsPopped then
ThreadCount := 0; // Add more TFpThreadWorkerTerminateItem
end;
@ -595,26 +723,52 @@ begin
WorkItem.DecRef;
end;
function TFpThreadWorkerQueue.PushItem(const AItem: TFpThreadWorkerItem
): TWaitResult;
procedure TFpThreadWorkerQueue.PushItem(const AItem: TFpThreadWorkerItem);
begin
repeat
if TotalItemsPopped = TotalItemsPushed then
AItem.AddRef;
Lock;
try
if TotalItemsPushed - TotalItemsPopped = QueueSize then
Grow(Min(QueueSize, 100));
AItem.AddRef;
Result := inherited PushItem(AItem);
until Result = wrSignaled;
inherited TryPushItemUnprotected(AItem);
finally
Unlock;
end;
end;
procedure TFpThreadWorkerQueue.PushItemIdleOrRun(
const AItem: TFpThreadWorkerItem);
var
q: Boolean;
begin
AItem.AddRef;
Lock;
try
q := IdleThreadCount > 0;
if q then begin
if TotalItemsPushed - TotalItemsPopped = QueueSize then
Grow(Min(QueueSize, 100));
inherited TryPushItemUnprotected(AItem);
end;
finally
Unlock;
end;
if not q then begin
AItem.DoExecute;
AItem.DecRef;
end;
end;
procedure TFpThreadWorkerQueue.RemoveItem(const AItem: TFpThreadWorkerItem);
begin
if AItem <> nil then
AItem.Cancel(Self.MainWaitEvent);
AItem.WaitForCancel(Self.MainWaitEvent);
end;
procedure TFpThreadWorkerQueue.WaitForItem(const AItem: TFpThreadWorkerItem);
procedure TFpThreadWorkerQueue.WaitForItem(const AItem: TFpThreadWorkerItem;
AWaitForExecInThread: Boolean);
begin
AItem.WaitFor(Self.MainWaitEvent);
AItem.WaitForFinish(Self.MainWaitEvent, AWaitForExecInThread);
end;
finalization