lazarus/components/multithreadprocs/mtprocs.pas
2021-06-10 22:11:48 +00:00

891 lines
26 KiB
ObjectPascal

{
**********************************************************************
This file is part of the Free Pascal run time library.
See the file COPYING.FPC, included in this distribution,
for details about the license.
**********************************************************************
Unit for light weight threads.
Copyright (C) 2008 Mattias Gaertner mattias@freepascal.org
Abstract:
Light weight threads.
This unit provides methods to easily run a procedure/method with several
threads at once.
}
unit MTProcs;
{$mode objfpc}{$H+}
{$inline on}
{$ModeSwitch nestedprocvars}
interface
uses
Classes, SysUtils, MTPCPU;
type
TProcThreadGroup = class;
TProcThreadPool = class;
TProcThread = class;
{ TMultiThreadProcItem }
TMTPThreadState = (
mtptsNone,
mtptsActive,
mtptsWaitingForIndex,
mtptsWaitingFailed,
mtptsInactive,
mtptsTerminated
);
TMultiThreadProcItem = class
private
FGroup: TProcThreadGroup;
FIndex: PtrInt;
FThread: TProcThread;
FWaitingForIndexEnd: PtrInt;
FWaitingForIndexStart: PtrInt;
fWaitForPool: PRTLEvent;
FState: TMTPThreadState;
public
destructor Destroy; override;
function WaitForIndexRange(StartIndex, EndIndex: PtrInt): boolean;
function WaitForIndex(Index: PtrInt): boolean; inline;
procedure CalcBlock(Index, BlockSize, LoopLength: PtrInt;
out BlockStart, BlockEnd: PtrInt); inline;
property Index: PtrInt read FIndex;
property Group: TProcThreadGroup read FGroup;
property WaitingForIndexStart: PtrInt read FWaitingForIndexStart;
property WaitingForIndexEnd: PtrInt read FWaitingForIndexEnd;
property Thread: TProcThread read FThread;
end;
{ TProcThread }
TMTPThreadList = (
mtptlPool,
mtptlGroup
);
TProcThread = class(TThread)
private
FItem: TMultiThreadProcItem;
FNext, FPrev: array[TMTPThreadList] of TProcThread;
procedure AddToList(var First: TProcThread; ListType: TMTPThreadList); inline;
procedure RemoveFromList(var First: TProcThread; ListType: TMTPThreadList); inline;
procedure Terminating(aPool: TProcThreadPool; E: Exception);
public
constructor Create;
destructor Destroy; override;
procedure Execute; override;
property Item: TMultiThreadProcItem read FItem;
end;
TMTMethod = procedure(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem) of object;
TMTProcedure = procedure(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem);
TMTNestedProcedure = procedure(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem) is nested;
{ TProcThreadGroup
Each task creates a new group of threads.
A group can either need more threads or it has finished and waits for its
threads to end.
The thread that created the group is not in the list FFirstThread. }
TMTPGroupState = (
mtpgsNone,
mtpgsNeedThreads, // the groups waiting for more threads to help
mtpgsFinishing, // the groups waiting for its threads to finish
mtpgsException // there was an exception => close asap
);
TProcThreadGroup = class
private
FEndIndex: PtrInt;
FException: Exception;
FFirstRunningIndex: PtrInt;
FFirstThread: TProcThread;
FLastRunningIndex: PtrInt;
FMaxThreads: PtrInt;
FNext, FPrev: TProcThreadGroup;
FPool: TProcThreadPool;
FStarterItem: TMultiThreadProcItem;
FStartIndex: PtrInt;
FState: TMTPGroupState;
FTaskData: Pointer;
FTaskFrame: Pointer;
FTaskMethod: TMTMethod;
FTaskNested: TMTNestedProcedure;
FTaskProcedure: TMTProcedure;
FThreadCount: PtrInt;
procedure AddToList(var First: TProcThreadGroup; ListType: TMTPGroupState); inline;
procedure RemoveFromList(var First: TProcThreadGroup); inline;
function NeedMoreThreads: boolean; inline;
procedure IncreaseLastRunningIndex(Item: TMultiThreadProcItem);
procedure AddThread(AThread: TProcThread);
procedure RemoveThread(AThread: TProcThread); inline;
procedure Run(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem); inline;
procedure IndexComplete(Index: PtrInt);
procedure WakeThreadsWaitingForIndex;
function HasFinishedIndex(aStartIndex, aEndIndex: PtrInt): boolean;
procedure EnterExceptionState(E: Exception);
public
constructor Create;
destructor Destroy; override;
property Pool: TProcThreadPool read FPool;
property StartIndex: PtrInt read FStartIndex;
property EndIndex: PtrInt read FEndIndex;
property FirstRunningIndex: PtrInt read FFirstRunningIndex; // first started
property LastRunningIndex: PtrInt read FLastRunningIndex; // last started
property TaskData: Pointer read FTaskData;
property TaskMethod: TMTMethod read FTaskMethod;
property TaskNested: TMTNestedProcedure read FTaskNested;
property TaskProcedure: TMTProcedure read FTaskProcedure;
property TaskFrame: Pointer read FTaskFrame;
property MaxThreads: PtrInt read FMaxThreads;
property StarterItem: TMultiThreadProcItem read FStarterItem;
end;
{ TLightWeightThreadPool
Group 0 are the inactive threads }
{ TProcThreadPool }
TProcThreadPool = class
private
FMaxThreadCount: PtrInt;
FThreadCount: PtrInt;
FFirstInactiveThread: TProcThread;
FFirstActiveThread: TProcThread;
FFirstTerminatedThread: TProcThread;
FFirstGroupNeedThreads: TProcThreadGroup;
FFirstGroupFinishing: TProcThreadGroup;
FCritSection: TRTLCriticalSection;
FDestroying: boolean;
procedure SetMaxThreadCount(const AValue: PtrInt);
procedure CleanTerminatedThreads;
procedure DoParallelIntern(const AMethod: TMTMethod;
const AProc: TMTProcedure; const ANested: TMTNestedProcedure;
const AFrame: Pointer; StartIndex, EndIndex: PtrInt;
Data: Pointer = nil; MaxThreads: PtrInt = 0);
public
// for debugging only: the critical section is public:
procedure EnterPoolCriticalSection; inline;
procedure LeavePoolCriticalSection; inline;
public
constructor Create;
destructor Destroy; override;
procedure DoParallel(const AMethod: TMTMethod;
StartIndex, EndIndex: PtrInt;
Data: Pointer = nil; MaxThreads: PtrInt = 0); inline;
procedure DoParallel(const AProc: TMTProcedure;
StartIndex, EndIndex: PtrInt;
Data: Pointer = nil; MaxThreads: PtrInt = 0); inline;
procedure DoParallelNested(const ANested: TMTNestedProcedure;
StartIndex, EndIndex: PtrInt;
Data: Pointer = nil; MaxThreads: PtrInt = 0); inline;
// experimental
procedure DoParallelLocalProc(const LocalProc: Pointer;
StartIndex, EndIndex: PtrInt;
Data: Pointer = nil; MaxThreads: PtrInt = 0); // do not make this inline!
// utility functions for loops:
procedure CalcBlockSize(LoopLength: PtrInt;
out BlockCount, BlockSize: PtrInt; MinBlockSize: PtrInt = 0); inline;
public
property MaxThreadCount: PtrInt read FMaxThreadCount write SetMaxThreadCount;
property ThreadCount: PtrInt read FThreadCount;
end;
var
ProcThreadPool: TProcThreadPool = nil;
threadvar
CurrentThread: TThread; // TProcThread sets this, you can set this for your own TThreads descendants
implementation
{ TMultiThreadProcItem }
destructor TMultiThreadProcItem.Destroy;
begin
if fWaitForPool<>nil then begin
RTLeventdestroy(fWaitForPool);
fWaitForPool:=nil;
end;
inherited Destroy;
end;
function TMultiThreadProcItem.WaitForIndexRange(
StartIndex, EndIndex: PtrInt): boolean;
var
aPool: TProcThreadPool;
begin
//WriteLn('TLightWeightThreadItem.WaitForIndexRange START Index='+IntToStr(Index)+' StartIndex='+IntToStr(StartIndex)+' EndIndex='+IntToStr(EndIndex));
if (EndIndex>=Index) then exit(false);
if EndIndex<StartIndex then exit(true);
if Group=nil then exit(true); // a single threaded group has no group object
// multi threaded group
aPool:=Group.Pool;
if aPool.FDestroying then exit(false); // no more wait allowed
aPool.EnterPoolCriticalSection;
try
if Group.FState=mtpgsException then begin
//WriteLn('TLightWeightThreadItem.WaitForIndexRange Index='+IntToStr(Index)+', Group closing because of error');
exit(false);
end;
if Group.HasFinishedIndex(StartIndex,EndIndex) then begin
//WriteLn('TLightWeightThreadItem.WaitForIndexRange Index='+IntToStr(Index)+', range already finished');
exit(true);
end;
FState:=mtptsWaitingForIndex;
FWaitingForIndexStart:=StartIndex;
FWaitingForIndexEnd:=EndIndex;
if fWaitForPool=nil then
fWaitForPool:=RTLEventCreate;
RTLeventResetEvent(fWaitForPool);
finally
aPool.LeavePoolCriticalSection;
end;
//WriteLn('TLightWeightThreadItem.WaitForIndexRange '+IntToStr(Index)+' waiting ... ');
RTLeventWaitFor(fWaitForPool);
Result:=FState=mtptsActive;
FState:=mtptsActive;
//WriteLn('TLightWeightThreadItem.WaitForIndexRange END '+IntToStr(Index));
end;
function TMultiThreadProcItem.WaitForIndex(Index: PtrInt): boolean; inline;
begin
Result:=WaitForIndexRange(Index,Index);
end;
procedure TMultiThreadProcItem.CalcBlock(Index, BlockSize, LoopLength: PtrInt;
out BlockStart, BlockEnd: PtrInt);
begin
BlockStart:=BlockSize*Index;
BlockEnd:=BlockStart+BlockSize;
if LoopLength<BlockEnd then BlockEnd:=LoopLength;
dec(BlockEnd);
end;
{ TProcThread }
procedure TProcThread.AddToList(var First: TProcThread;
ListType: TMTPThreadList);
begin
FNext[ListType]:=First;
if FNext[ListType]<>nil then
FNext[ListType].FPrev[ListType]:=Self;
First:=Self;
end;
procedure TProcThread.RemoveFromList(var First: TProcThread;
ListType: TMTPThreadList);
begin
if First=Self then
First:=FNext[ListType];
if FNext[ListType]<>nil then
FNext[ListType].FPrev[ListType]:=FPrev[ListType];
if FPrev[ListType]<>nil then
FPrev[ListType].FNext[ListType]:=FNext[ListType];
FNext[ListType]:=nil;
FPrev[ListType]:=nil;
end;
procedure TProcThread.Terminating(aPool: TProcThreadPool;
E: Exception);
begin
aPool.EnterPoolCriticalSection;
try
// remove from group
if Item.FGroup<>nil then begin
// an exception occurred
Item.FGroup.EnterExceptionState(E);
Item.FGroup.RemoveThread(Self);
Item.FGroup:=nil;
end;
// move to pool's terminated threads
case Item.FState of
mtptsActive: RemoveFromList(aPool.FFirstActiveThread,mtptlPool);
mtptsInactive: RemoveFromList(aPool.FFirstInactiveThread,mtptlPool);
end;
AddToList(aPool.FFirstTerminatedThread,mtptlPool);
Item.FState:=mtptsTerminated;
finally
aPool.LeavePoolCriticalSection;
end;
end;
constructor TProcThread.Create;
begin
inherited Create(true);
fItem:=TMultiThreadProcItem.Create;
fItem.fWaitForPool:=RTLEventCreate;
fItem.FThread:=Self;
end;
destructor TProcThread.Destroy;
begin
FreeAndNil(FItem);
inherited Destroy;
end;
procedure TProcThread.Execute;
var
aPool: TProcThreadPool;
Group: TProcThreadGroup;
ok: Boolean;
E: Exception;
begin
MTProcs.CurrentThread:=Self;
aPool:=Item.Group.Pool;
ok:=false;
try
repeat
// work
Group:=Item.Group;
Group.Run(Item.Index,Group.TaskData,Item);
aPool.EnterPoolCriticalSection;
try
Group.IndexComplete(Item.Index);
// find next work
if Group.LastRunningIndex<Group.EndIndex then begin
// next index of group
Group.IncreaseLastRunningIndex(Item);
end else begin
// remove from group
RemoveFromList(Group.FFirstThread,mtptlGroup);
dec(Group.FThreadCount);
Item.FGroup:=nil;
Group:=nil;
if aPool.FFirstGroupNeedThreads<>nil then begin
// add to new group
aPool.FFirstGroupNeedThreads.AddThread(Self);
Group:=Item.Group;
end else begin
// mark inactive
RemoveFromList(aPool.FFirstActiveThread,mtptlPool);
AddToList(aPool.FFirstInactiveThread,mtptlPool);
Item.FState:=mtptsInactive;
RTLeventResetEvent(Item.fWaitForPool);
end;
end;
finally
aPool.LeavePoolCriticalSection;
end;
// wait for new work
if Item.FState=mtptsInactive then
RTLeventWaitFor(Item.fWaitForPool);
until Item.Group=nil;
ok:=true;
except
// stop the exception and store it
E:=Exception(AcquireExceptionObject);
Terminating(aPool,E);
end;
if ok then
Terminating(aPool,nil);
end;
{ TProcThreadGroup }
procedure TProcThreadGroup.AddToList(var First: TProcThreadGroup;
ListType: TMTPGroupState);
begin
FNext:=First;
if FNext<>nil then
FNext.FPrev:=Self;
First:=Self;
FState:=ListType;
end;
procedure TProcThreadGroup.RemoveFromList(
var First: TProcThreadGroup);
begin
if First=Self then
First:=FNext;
if FNext<>nil then
FNext.FPrev:=FPrev;
if FPrev<>nil then
FPrev.FNext:=FNext;
FNext:=nil;
FPrev:=nil;
FState:=mtpgsNone;
end;
function TProcThreadGroup.NeedMoreThreads: boolean;
begin
Result:=(FLastRunningIndex<FEndIndex) and (FThreadCount<FMaxThreads)
and (FState<>mtpgsException);
end;
procedure TProcThreadGroup.IncreaseLastRunningIndex(Item: TMultiThreadProcItem);
begin
inc(FLastRunningIndex);
Item.FIndex:=FLastRunningIndex;
if NeedMoreThreads then exit;
if FState=mtpgsNeedThreads then begin
RemoveFromList(Pool.FFirstGroupNeedThreads);
AddToList(Pool.FFirstGroupFinishing,mtpgsFinishing);
end;
end;
procedure TProcThreadGroup.AddThread(AThread: TProcThread);
begin
AThread.Item.FGroup:=Self;
AThread.AddToList(FFirstThread,mtptlGroup);
inc(FThreadCount);
IncreaseLastRunningIndex(AThread.Item);
end;
procedure TProcThreadGroup.RemoveThread(AThread: TProcThread);
begin
AThread.RemoveFromList(FFirstThread,mtptlGroup);
dec(FThreadCount);
end;
procedure TProcThreadGroup.Run(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem); inline;
begin
if Assigned(FTaskFrame) then
CallLocalProc(FTaskProcedure,FTaskFrame,Index,Data,Item)
else if Assigned(FTaskProcedure) then
FTaskProcedure(Index,Data,Item)
else if Assigned(FTaskNested) then
FTaskNested(Index,Data,Item)
else
FTaskMethod(Index,Data,Item);
end;
procedure TProcThreadGroup.IndexComplete(Index: PtrInt);
var
AThread: TProcThread;
NewFirstRunningThread: PtrInt;
begin
// update FirstRunningIndex
NewFirstRunningThread:=FStarterItem.Index;
AThread:=FFirstThread;
while AThread<>nil do begin
if (NewFirstRunningThread>aThread.Item.Index)
and (aThread.Item.Index<>Index) then
NewFirstRunningThread:=aThread.Item.Index;
aThread:=aThread.FNext[mtptlGroup];
end;
FFirstRunningIndex:=NewFirstRunningThread;
// wake up threads (Note: do this even if FFirstRunningIndex has not changed)
WakeThreadsWaitingForIndex;
end;
procedure TProcThreadGroup.WakeThreadsWaitingForIndex;
var
aThread: TProcThread;
begin
if FState<>mtpgsException then begin
// wake up waiting threads
aThread:=FFirstThread;
while aThread<>nil do begin
if (aThread.Item.FState=mtptsWaitingForIndex)
and HasFinishedIndex(aThread.Item.WaitingForIndexStart,
aThread.Item.WaitingForIndexEnd)
then begin
// wake up the thread
aThread.Item.FState:=mtptsActive;
RTLeventSetEvent(aThread.Item.fWaitForPool);
end;
aThread:=aThread.FNext[mtptlGroup];
end;
if (FStarterItem.FState=mtptsWaitingForIndex)
and HasFinishedIndex(FStarterItem.WaitingForIndexStart,FStarterItem.WaitingForIndexEnd)
then begin
// wake up the starter thread of this group
FStarterItem.FState:=mtptsActive;
RTLeventSetEvent(FStarterItem.fWaitForPool);
end;
end else begin
// end group: wake up waiting threads
aThread:=FFirstThread;
while aThread<>nil do begin
if (aThread.Item.FState=mtptsWaitingForIndex)
then begin
// end group: wake up the thread
aThread.Item.FState:=mtptsWaitingFailed;
RTLeventSetEvent(aThread.Item.fWaitForPool);
end;
aThread:=aThread.FNext[mtptlGroup];
end;
if (FStarterItem.FState=mtptsWaitingForIndex)
then begin
// end group: wake up the starter thread of this group
FStarterItem.FState:=mtptsWaitingFailed;
RTLeventSetEvent(FStarterItem.fWaitForPool);
end;
end;
end;
function TProcThreadGroup.HasFinishedIndex(
aStartIndex, aEndIndex: PtrInt): boolean;
var
AThread: TProcThread;
begin
// test the finished range
if FFirstRunningIndex>aEndIndex then exit(true);
// test the unfinished range
if FLastRunningIndex<aEndIndex then exit(false);
// test the active range
AThread:=FFirstThread;
while AThread<>nil do begin
if (AThread.Item.Index>=aStartIndex)
and (AThread.Item.Index<=aEndIndex) then
exit(false);
AThread:=AThread.FNext[mtptlGroup];
end;
if (FStarterItem.Index>=aStartIndex)
and (FStarterItem.Index<=aEndIndex) then
exit(false);
Result:=true;
end;
procedure TProcThreadGroup.EnterExceptionState(E: Exception);
begin
if FState=mtpgsException then exit;
case FState of
mtpgsFinishing: RemoveFromList(Pool.FFirstGroupFinishing);
mtpgsNeedThreads: RemoveFromList(Pool.FFirstGroupNeedThreads);
end;
FState:=mtpgsException;
FException:=E;
WakeThreadsWaitingForIndex;
end;
constructor TProcThreadGroup.Create;
begin
FStarterItem:=TMultiThreadProcItem.Create;
FStarterItem.FGroup:=Self;
end;
destructor TProcThreadGroup.Destroy;
begin
FreeAndNil(FStarterItem);
inherited Destroy;
end;
{ TProcThreadPool }
procedure TProcThreadPool.SetMaxThreadCount(const AValue: PtrInt);
begin
if FMaxThreadCount=AValue then exit;
if AValue<1 then raise Exception.Create('TLightWeightThreadPool.SetMaxThreadCount');
FMaxThreadCount:=AValue;
end;
procedure TProcThreadPool.CleanTerminatedThreads;
var
AThread: TProcThread;
begin
while FFirstTerminatedThread<>nil do begin
AThread:=FFirstTerminatedThread;
AThread.RemoveFromList(FFirstTerminatedThread,mtptlPool);
AThread.Free;
end;
end;
constructor TProcThreadPool.Create;
begin
FMaxThreadCount:=GetSystemThreadCount;
if FMaxThreadCount<1 then
FMaxThreadCount:=1;
InitCriticalSection(FCritSection);
end;
destructor TProcThreadPool.Destroy;
procedure WakeWaitingStarterItems(Group: TProcThreadGroup);
begin
while Group<>nil do begin
if Group.StarterItem.FState=mtptsWaitingForIndex then begin
Group.StarterItem.FState:=mtptsWaitingFailed;
RTLeventSetEvent(Group.StarterItem.fWaitForPool);
end;
Group:=Group.FNext;
end;
end;
var
AThread: TProcThread;
begin
FDestroying:=true;
// wake up all waiting threads
EnterPoolCriticalSection;
try
AThread:=FFirstActiveThread;
while AThread<>nil do begin
if aThread.Item.FState=mtptsWaitingForIndex then begin
aThread.Item.FState:=mtptsWaitingFailed;
RTLeventSetEvent(AThread.Item.fWaitForPool);
end;
AThread:=AThread.FNext[mtptlPool];
end;
WakeWaitingStarterItems(FFirstGroupNeedThreads);
WakeWaitingStarterItems(FFirstGroupFinishing);
finally
LeavePoolCriticalSection;
end;
// wait for all active threads to become inactive
while FFirstActiveThread<>nil do
Sleep(10);
// wake up all inactive threads (without new work they will terminate)
EnterPoolCriticalSection;
try
AThread:=FFirstInactiveThread;
while AThread<>nil do begin
RTLeventSetEvent(AThread.Item.fWaitForPool);
AThread:=AThread.FNext[mtptlPool];
end;
finally
LeavePoolCriticalSection;
end;
// wait for all threads to terminate
while FFirstInactiveThread<>nil do
Sleep(10);
// free threads
CleanTerminatedThreads;
DoneCriticalsection(FCritSection);
inherited Destroy;
end;
procedure TProcThreadPool.EnterPoolCriticalSection;
begin
EnterCriticalsection(FCritSection);
end;
procedure TProcThreadPool.LeavePoolCriticalSection;
begin
LeaveCriticalsection(FCritSection);
end;
procedure TProcThreadPool.DoParallel(const AMethod: TMTMethod;
StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
begin
if not Assigned(AMethod) then exit;
DoParallelIntern(AMethod,nil,nil,nil,StartIndex,EndIndex,Data,MaxThreads);
end;
procedure TProcThreadPool.DoParallel(const AProc: TMTProcedure;
StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
begin
if not Assigned(AProc) then exit;
DoParallelIntern(nil,AProc,nil,nil,StartIndex,EndIndex,Data,MaxThreads);
end;
procedure TProcThreadPool.DoParallelNested(const ANested: TMTNestedProcedure;
StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
begin
if not Assigned(ANested) then exit;
DoParallelIntern(nil,nil,ANested,nil,StartIndex,EndIndex,Data,MaxThreads);
end;
procedure TProcThreadPool.DoParallelLocalProc(const LocalProc: Pointer;
StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
var
Frame: Pointer;
begin
if not Assigned(LocalProc) then exit;
Frame:=get_caller_frame(get_frame);
DoParallelIntern(nil,TMTProcedure(LocalProc),nil,Frame,StartIndex,EndIndex,
Data,MaxThreads);
end;
procedure TProcThreadPool.CalcBlockSize(LoopLength: PtrInt; out BlockCount,
BlockSize: PtrInt; MinBlockSize: PtrInt);
begin
if LoopLength<=0 then begin
BlockCount:=0;
BlockSize:=1;
exit;
end;
// split work into equally sized blocks
BlockCount:=ProcThreadPool.MaxThreadCount;
BlockSize:=(LoopLength div BlockCount);
if (BlockSize<MinBlockSize) then BlockSize:=MinBlockSize;
if BlockSize<1 then BlockSize:=1;
BlockCount:=((LoopLength-1) div BlockSize)+1;
end;
procedure TProcThreadPool.DoParallelIntern(const AMethod: TMTMethod;
const AProc: TMTProcedure; const ANested: TMTNestedProcedure;
const AFrame: Pointer; StartIndex, EndIndex: PtrInt; Data: Pointer;
MaxThreads: PtrInt);
var
Group: TProcThreadGroup;
Index: PtrInt;
AThread: TProcThread;
NewThread: Boolean;
Item: TMultiThreadProcItem;
HelperThreadException: Exception;
begin
if (StartIndex>EndIndex) then exit; // nothing to do
if FDestroying then raise Exception.Create('Pool destroyed');
if (MaxThreads>MaxThreadCount) or (MaxThreads<=0) then
MaxThreads:=MaxThreadCount;
if (StartIndex=EndIndex) or (MaxThreads<=1) then begin
// single threaded
Item:=TMultiThreadProcItem.Create;
try
for Index:=StartIndex to EndIndex do begin
Item.FIndex:=Index;
if Assigned(AFrame) then
CallLocalProc(AProc,AFrame,Index,Data,Item)
else if Assigned(AProc) then
AProc(Index,Data,Item)
else if Assigned(AMethod) then
AMethod(Index,Data,Item)
else
ANested(Index,Data,Item);
end;
finally
Item.Free;
end;
exit;
end;
// create a new group
Group:=TProcThreadGroup.Create;
Group.FPool:=Self;
Group.FTaskData:=Data;
Group.FTaskMethod:=AMethod;
Group.FTaskProcedure:=AProc;
Group.FTaskNested:=ANested;
Group.FTaskFrame:=AFrame;
Group.FStartIndex:=StartIndex;
Group.FEndIndex:=EndIndex;
Group.FFirstRunningIndex:=StartIndex;
Group.FLastRunningIndex:=StartIndex;
Group.FMaxThreads:=MaxThreads;
Group.FThreadCount:=1;
Group.FStarterItem.FState:=mtptsActive;
Group.FStarterItem.FIndex:=StartIndex;
HelperThreadException:=nil;
try
// start threads
EnterPoolCriticalSection;
try
Group.AddToList(FFirstGroupNeedThreads,mtpgsNeedThreads);
while Group.NeedMoreThreads do begin
AThread:=FFirstInactiveThread;
NewThread:=false;
if AThread<>nil then begin
AThread.RemoveFromList(FFirstInactiveThread,mtptlPool);
end else if FThreadCount<FMaxThreadCount then begin
AThread:=TProcThread.Create;
if Assigned(AThread.FatalException) then
raise AThread.FatalException;
NewThread:=true;
inc(FThreadCount);
end else begin
break;
end;
// add to Group
Group.AddThread(AThread);
// start thread
AThread.AddToList(FFirstActiveThread,mtptlPool);
AThread.Item.FState:=mtptsActive;
if NewThread then
AThread.Start
else
RTLeventSetEvent(AThread.Item.fWaitForPool);
end;
finally
LeavePoolCriticalSection;
end;
// run until no more Index left
Index:=StartIndex;
repeat
Group.FStarterItem.FIndex:=Index;
Group.Run(Index,Data,Group.FStarterItem);
EnterPoolCriticalSection;
try
Group.IndexComplete(Index);
if (Group.FLastRunningIndex<Group.EndIndex) and (Group.FState<>mtpgsException)
then begin
inc(Group.FLastRunningIndex);
Index:=Group.FLastRunningIndex;
end else begin
Index:=StartIndex;
end;
finally
LeavePoolCriticalSection;
end;
until Index=StartIndex;
finally
// wait for Group to finish
if Group.FFirstThread<>nil then begin
EnterPoolCriticalSection;
try
Group.FStarterItem.FState:=mtptsInactive;
Group.FStarterItem.fIndex:=EndIndex;// needed for Group.HasFinishedIndex
// wake threads waiting for starter thread to finish
if Group.FStarterItem.FState<>mtptsInactive then
Group.EnterExceptionState(nil)
else
Group.WakeThreadsWaitingForIndex;
finally
LeavePoolCriticalSection;
end;
// waiting with exponential spin lock
Index:=0;
while Group.FFirstThread<>nil do begin
sleep(Index);
Index:=Index*2+1;
if Index>30 then Index:=30;
end;
end;
// remove group from pool
EnterPoolCriticalSection;
try
case Group.FState of
mtpgsNeedThreads: Group.RemoveFromList(FFirstGroupNeedThreads);
mtpgsFinishing: Group.RemoveFromList(FFirstGroupFinishing);
end;
finally
LeavePoolCriticalSection;
end;
HelperThreadException:=Group.FException;
Group.Free;
// free terminated threads (terminated, because of exceptions)
CleanTerminatedThreads;
end;
// if the exception occurred in a helper thread raise it now
if HelperThreadException<>nil then
raise HelperThreadException;
end;
initialization
ProcThreadPool:=TProcThreadPool.Create;
CurrentThread:=nil;
finalization
ProcThreadPool.Free;
ProcThreadPool:=nil;
end.