FpDebug: Allow multiple threads to call TFpThreadWorkerItem.WaitForFinish

git-svn-id: trunk@65187 -
This commit is contained in:
martin 2021-06-08 20:58:38 +00:00
parent 93b69aefd6
commit e94b71c589

View File

@ -50,6 +50,8 @@ type
TFpThreadWorkerQueue = class;
TFpWorkerThread = class;
PPRTLEvent = ^PRTLEvent;
{ TFpThreadWorkerItem }
TFpThreadWorkerItem = class
@ -60,7 +62,9 @@ type
TWSTATE_WAIT_WORKER = cardinal(3);
TWSTATE_DONE = cardinal(4);
TWSTATE_CANCEL = cardinal(5);
EVENT_DONE_INDICATOR = Pointer(1);
private
FWorkerItemEventPtr: PPRTLEvent;
FState: Cardinal;
FError: Exception;
FRefCnt: LongInt;
@ -68,14 +72,16 @@ type
FLogGroup: PLazLoggerLogGroup;
function GetIsCancelled: Boolean;
function GetIsDone: Boolean;
function MaybeWaitForPreviousWait(AQueue: TFpThreadWorkerQueue; AnEvntPtr: PPRTLEvent): boolean;
function MaybeWaitForEvent(AnEvnt: PRTLEvent): Boolean; inline;
protected
procedure DoExecute; virtual;
procedure DoFinished; virtual;
procedure DoUnQueued; virtual; // When queue shuts down / Not called when Item is Cancelled
procedure ExecuteInThread(MyWorkerThread: TFpWorkerThread); // called by worker thread
procedure WaitForFinish(AnMainWaitEvent: PRTLEvent; AWaitForExecInThread: Boolean); // called by main thread => calls DoExecute, if needed
procedure WaitForCancel(AnMainWaitEvent: PRTLEvent); // called by main thread => calls DoExecute, if needed
procedure WaitForFinish(AQueue: TFpThreadWorkerQueue; AWaitForExecInThread: Boolean); // called by main thread => calls DoExecute, if needed
procedure WaitForCancel(AQueue: TFpThreadWorkerQueue); // called by main thread => calls DoExecute, if needed
public
procedure Execute; // Exec in main thread / Only if NOT queued
procedure AddRef;
@ -124,6 +130,8 @@ type
FLogGroup: PLazLoggerLogGroup;
FIdleThreadCount: integer;
function GetRtlEvent: PRTLEvent;
procedure FreeRtrEvent(AnEvent: PRTLEvent);
function RemoveThread(Item: TFpWorkerThread): Integer;
property WantedCount: Integer read GetWantedCount;
property CurrentCount: Integer read GetCurrentCount;
@ -573,6 +581,7 @@ end;
procedure TFpThreadWorkerItem.ExecuteInThread(MyWorkerThread: TFpWorkerThread);
var
OldState: Cardinal;
Evnt: PPRTLEvent;
begin
OldState := InterlockedCompareExchange(FState, TWSTATE_RUNNING, TWSTATE_NEW);
DebugLn(FLogGroup, '%s!%s Executing WorkItem: %s "%s" StopRequested=%s', [dbgsThread, DbgSTime, dbgsWorkItemState(OldState), DebugText, dbgs(StopRequested)]);
@ -586,8 +595,14 @@ begin
finally
DebugLnExit(FLogGroup);
OldState := InterLockedExchange(FState, TWSTATE_DONE);
if (OldState in [TWSTATE_WAITING, TWSTATE_WAIT_WORKER, TWSTATE_CANCEL]) then
RTLeventSetEvent(MyWorkerThread.Queue.MainWaitEvent)
if (OldState in [TWSTATE_WAITING, TWSTATE_WAIT_WORKER, TWSTATE_CANCEL]) then begin
// The FState is in TWSTATE_WAIT___ or TWSTATE_CANCEL
// => so the event will exist, until it returned from RTLEventWaitFor
// It is save to access
Evnt := InterlockedExchange(FWorkerItemEventPtr, EVENT_DONE_INDICATOR);
if Evnt <> nil then
RTLEventSetEvent(Evnt^);
end
else
// If other threads have a ref, they may call WaitForFinish and read data from this.
if (InterLockedExchangeAdd(FRefCnt, 0) > 1) then
@ -597,54 +612,137 @@ begin
end;
end;
procedure TFpThreadWorkerItem.WaitForFinish(AnMainWaitEvent: PRTLEvent;
function TFpThreadWorkerItem.MaybeWaitForPreviousWait(
AQueue: TFpThreadWorkerQueue; AnEvntPtr: PPRTLEvent): boolean;
var
ExistingEvnt: Pointer;
begin
Result := False;
(* - Set FWorkerItemEventPtr before changing the state.
- Once the NewStateForWait is set to TWSTATE_WAIT___ or TWSTATE_CANCEL the event
belongs to the thread, until it has been waited for
- If there is an ExistingEvnt, it must be SET once our event was waited for.
*)
ExistingEvnt := InterlockedExchange(FWorkerItemEventPtr, AnEvntPtr);
if ExistingEvnt <> nil then begin
// Someone is already waiting for this Item
Result := True;
(* EVENT_DONE_INDICATOR
If we get EVENT_DONE_INDICATOR, then the WorkItem is done too => no need to wait
Return our item. The WorkThread is not going to use it anymore.
*)
if ExistingEvnt <> EVENT_DONE_INDICATOR then begin
(* - WorkItem may have advanced the FState to TWSTATE_DONE.
But in that case, it will have set our Evnt.
- If somebody else is waiting, their decission of "AWaitForExecInThread"
will be honored
*)
DebugLnEnter(FLogGroup);
RTLEventWaitFor(AnEvntPtr^);
RTLEventSetEvent(ExistingEvnt); // Signal the other waiting thread
DebugLnExit(FLogGroup, '%s!%s DONE WaitForFinish (with existing waiting): "%s" StopRequested=%s', [dbgsThread, DbgSTime, DebugText, dbgs(StopRequested)]);
end;
assert(FState = TWSTATE_DONE, 'TFpThreadWorkerItem.WaitForFinish: FState = TWSTATE_DONE');
end;
end;
function TFpThreadWorkerItem.MaybeWaitForEvent(AnEvnt: PRTLEvent): Boolean;
var
ExistingEvntPtr: PPRTLEvent;
begin
Result := False;
ExistingEvntPtr := InterlockedExchange(FWorkerItemEventPtr, EVENT_DONE_INDICATOR);
if (ExistingEvntPtr <> nil) and (ExistingEvntPtr^ <> nil) and (ExistingEvntPtr^ <> AnEvnt) then begin // Some one else is waiting
RTLEventSetEvent(ExistingEvntPtr^);
RTLEventWaitFor(AnEvnt);
Result := True;
end;
end;
procedure TFpThreadWorkerItem.WaitForFinish(AQueue: TFpThreadWorkerQueue;
AWaitForExecInThread: Boolean);
var
OldState: Cardinal;
Evnt: PRTLEvent;
begin
(* | True (wait for run in work thread) | False (run in caller thread)
TWSTATE_NEW : mark TWSTATE_WAIT_WORKER => wait : ~
TWSTATE_RUNNING : mark TWSTATE_WAIT_WORKER => wait : mark TWSTATE_WAITING => wait
TWSTATE_WAITING : 2ndary wait call, leave to primary : ~
TWSTATE_WAIT_WORKER : 2ndary wait call, leave to primary : ~
TWSTATE_DONE : KEEP (will be restored at exit) : ~
TWSTATE_CANCEL : not allowed : ~
*)
if FState = TWSTATE_DONE then
exit;
Evnt := AQueue.GetRtlEvent;
if MaybeWaitForPreviousWait(AQueue, @Evnt) then begin
AQueue.FreeRtrEvent(Evnt);
exit;
end;
(* - There was no other thread waiting
- MaybeWaitForPreviousWait has set FWorkerItemEventPtr, therefore:
=> *** NO OTHER THREAD WILL ENTER THE CODE BELOW ***
- We must set FState to TWSTATE_WAIT___ or TWSTATE_CANCEL
=> in order for the WorkerThread to trigger the event
=> if the WorkerThread has gone TWSTATE_DONE the event will NOT be triggered
*)
if AWaitForExecInThread then begin
// TWSTATE_NEW : mark TWSTATE_WAIT_WORKER, wait
// TWSTATE_RUNNING : mark TWSTATE_WAIT_WORKER, wait
// TWSTATE_WAITING : impossible
// TWSTATE_WAIT_WORKER : impossible
// TWSTATE_DONE : KEEP (will be restored at exit)
// TWSTATE_CANCEL : not allowed
OldState := InterLockedExchange(FState, TWSTATE_WAIT_WORKER);
OldState := InterlockedExchange(FState, TWSTATE_WAIT_WORKER);
DebugLn(FLogGroup, '%s!%s WaitForFinish (WITH exe): %s "%s" StopRequested=%s', [dbgsThread, DbgSTime, dbgsWorkItemState(OldState), DebugText, dbgs(StopRequested)]);
assert(not (OldState in [TWSTATE_WAITING, TWSTATE_WAIT_WORKER, TWSTATE_CANCEL]), 'TFpThreadWorkerItem.WaitForFinish: not (OldState in [TWSTATE_WAITING, TWSTATE_WAIT_WORKER, TWSTATE_CANCEL])');
if (OldState in [TWSTATE_NEW, TWSTATE_RUNNING]) then begin
DebugLnEnter(FLogGroup);
RTLeventWaitFor(AnMainWaitEvent);
RTLeventResetEvent(AnMainWaitEvent);
RTLEventWaitFor(Evnt);
DebugLnExit(FLogGroup, '%s!%s DONE WaitForFinish (WITH exe): "%s" StopRequested=%s', [dbgsThread, DbgSTime, DebugText, dbgs(StopRequested)]);
end
else
ReadBarrier;
else begin
assert(OldState = TWSTATE_DONE, 'TFpThreadWorkerItem.WaitForFinish: OldState = TWSTATE_DONE');
FState := TWSTATE_DONE;
if not MaybeWaitForEvent(Evnt) then
ReadBarrier; // State must have advanced to TWSTATE_DONE;
end;
end
else
begin
OldState := InterLockedExchange(FState, TWSTATE_WAITING);
OldState := InterlockedExchange(FState, TWSTATE_WAITING);
DebugLn(FLogGroup, '%s!%s WaitForFinish (NO exe): %s "%s" StopRequested=%s', [dbgsThread, DbgSTime, dbgsWorkItemState(OldState), DebugText, dbgs(StopRequested)]);
assert(not (OldState in [TWSTATE_WAITING, TWSTATE_WAIT_WORKER, TWSTATE_CANCEL]), 'TFpThreadWorkerItem.WaitForFinish: not (OldState in [TWSTATE_WAITING, TWSTATE_WAIT_WORKER, TWSTATE_CANCEL])');
if OldState = TWSTATE_NEW then begin
DoExecute;
InterLockedExchange(FState, TWSTATE_DONE);
MaybeWaitForEvent(Evnt);
end
else
if OldState = TWSTATE_RUNNING then begin
DebugLnEnter(FLogGroup);
RTLeventWaitFor(AnMainWaitEvent);
RTLeventResetEvent(AnMainWaitEvent);
RTLEventWaitFor(Evnt);
DebugLnExit(FLogGroup, '%s!%s DONE WaitForFinish (NO exe): "%s" StopRequested=%s', [dbgsThread, DbgSTime, DebugText, dbgs(StopRequested)]);
end
else
ReadBarrier;
else begin
assert(OldState = TWSTATE_DONE, 'TFpThreadWorkerItem.WaitForFinish: OldState = TWSTATE_DONE');
FState := TWSTATE_DONE;
if not MaybeWaitForEvent(Evnt) then
ReadBarrier;
end;
end;
FState := TWSTATE_DONE; // No interlocked: The worker thread is done, so only the main thread is accessing this now
AQueue.FreeRtrEvent(Evnt);
assert(FState = TWSTATE_DONE, 'TFpThreadWorkerItem.WaitForFinish: FState = TWSTATE_DONE');
end;
procedure TFpThreadWorkerItem.WaitForCancel(AnMainWaitEvent: PRTLEvent);
procedure TFpThreadWorkerItem.WaitForCancel(AQueue: TFpThreadWorkerQueue);
var
OldState: Cardinal;
Evnt: PRTLEvent;
begin
// TWSTATE_NEW : mark TWSTATE_CANCEL
// TWSTATE_RUNNING : mark TWSTATE_CANCEL, wait
@ -652,20 +750,39 @@ begin
// TWSTATE_WAIT_WORKER : impossible
// TWSTATE_DONE : KEEP (will be restored at exit)
// TWSTATE_CANCEL : KEEP
RequestStop;
FStopRequested := True;
//RequestStop; // Can not call RequestStop / might change the state => must first call MaybeWaitForPreviousWait
if FState = TWSTATE_DONE then
exit;
Evnt := AQueue.GetRtlEvent;
if MaybeWaitForPreviousWait(AQueue, @Evnt) then begin
AQueue.FreeRtrEvent(Evnt);
exit;
end;
(* - There was no other thread waiting
- MaybeWaitForPreviousWait has set FWorkerItemEventPtr, therefore:
=> *** NO OTHER THREAD WILL ENTER THE CODE BELOW ***
*)
OldState := InterLockedExchange(FState, TWSTATE_CANCEL); // Prevent thread form executing this
Debugln(FLogGroup, '%s!%s WaitForCancel: %s "%s"', [dbgsThread, DbgSTime, dbgsWorkItemState(OldState), DebugText]);
assert(not (OldState in [TWSTATE_WAITING, TWSTATE_WAIT_WORKER]), 'TFpThreadWorkerItem.WaitForCancel: not (OldState in [TWSTATE_WAITING, TWSTATE_WAIT_WORKER])');
if OldState = TWSTATE_RUNNING then begin
DebugLnEnter(FLogGroup);
RTLeventWaitFor(AnMainWaitEvent);
RTLeventResetEvent(AnMainWaitEvent);
RTLEventWaitFor(Evnt);
DebugLnExit(FLogGroup, '%s!%s DONE WaitForCancel: "%s"', [dbgsThread, DbgSTime, DebugText]);
end
else
if OldState = TWSTATE_DONE then begin
FState := TWSTATE_DONE;
else begin
if OldState = TWSTATE_DONE then begin
FState := TWSTATE_DONE;
end;
MaybeWaitForEvent(Evnt);
end;
AQueue.FreeRtrEvent(Evnt);
end;
procedure TFpThreadWorkerItem.Execute;
@ -853,6 +970,22 @@ begin
end;
end;
function TFpThreadWorkerQueue.GetRtlEvent: PRTLEvent;
begin
Result := InterlockedExchange(FMainWaitEvent, nil);
if Result = nil then
Result := RTLEventCreate;
end;
procedure TFpThreadWorkerQueue.FreeRtrEvent(AnEvent: PRTLEvent);
begin
assert(AnEvent <> nil, 'TFpThreadWorkerQueue.FreeRtrEvent: AnEvent <> nil');
RTLEventResetEvent(AnEvent);
AnEvent := InterlockedExchange(FMainWaitEvent, AnEvent);
if AnEvent <> nil then
RTLEventDestroy(AnEvent);
end;
function TFpThreadWorkerQueue.RemoveThread(Item: TFpWorkerThread): Integer;
begin
FThreadMonitor.Enter;
@ -1001,13 +1134,13 @@ end;
procedure TFpThreadWorkerQueue.RemoveItem(const AItem: TFpThreadWorkerItem);
begin
if AItem <> nil then
AItem.WaitForCancel(Self.MainWaitEvent);
AItem.WaitForCancel(Self);
end;
procedure TFpThreadWorkerQueue.WaitForItem(const AItem: TFpThreadWorkerItem;
AWaitForExecInThread: Boolean);
begin
AItem.WaitForFinish(Self.MainWaitEvent, AWaitForExecInThread);
AItem.WaitForFinish(Self, AWaitForExecInThread);
end;
initialization