{ ********************************************************************** This file is part of the Free Pascal run time library. See the file COPYING.FPC, included in this distribution, for details about the license. ********************************************************************** Unit for light weight threads. Copyright (C) 2008 Mattias Gaertner mattias@freepascal.org Abstract: Light weight threads. This unit provides methods to easily run a procedure/method with several threads at once. } unit MTProcs; {$mode objfpc}{$H+} {$inline on} interface uses Classes, SysUtils, MTPCPU; type TProcThreadGroup = class; TProcThreadPool = class; TProcThread = class; { TMultiThreadProcItem } TMTPThreadState = ( mtptsNone, mtptsActive, mtptsWaitingForIndex, mtptsWaitingFailed, mtptsInactive, mtptsTerminated ); TMultiThreadProcItem = class private FGroup: TProcThreadGroup; FIndex: PtrInt; FThread: TProcThread; FWaitingForIndexEnd: PtrInt; FWaitingForIndexStart: PtrInt; fWaitForPool: PRTLEvent; FState: TMTPThreadState; public destructor Destroy; override; function WaitForIndexRange(StartIndex, EndIndex: PtrInt): boolean; function WaitForIndex(Index: PtrInt): boolean; inline; procedure CalcBlock(Index, BlockSize, LoopLength: PtrInt; out BlockStart, BlockEnd: PtrInt); inline; property Index: PtrInt read FIndex; property Group: TProcThreadGroup read FGroup; property WaitingForIndexStart: PtrInt read FWaitingForIndexStart; property WaitingForIndexEnd: PtrInt read FWaitingForIndexEnd; property Thread: TProcThread read FThread; end; { TProcThread } TMTPThreadList = ( mtptlPool, mtptlGroup ); TProcThread = class(TThread) private FItem: TMultiThreadProcItem; FNext, FPrev: array[TMTPThreadList] of TProcThread; procedure AddToList(var First: TProcThread; ListType: TMTPThreadList); inline; procedure RemoveFromList(var First: TProcThread; ListType: TMTPThreadList); inline; procedure Terminating(aPool: TProcThreadPool; E: Exception); public constructor Create; destructor Destroy; override; procedure Execute; override; property Item: TMultiThreadProcItem read FItem; end; TMTMethod = procedure(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem) of object; TMTProcedure = procedure(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem); { TProcThreadGroup Each task creates a new group of threads. A group can either need more threads or it has finished and waits for its threads to end. The thread that created the group is not in the list FFirstThread. } TMTPGroupState = ( mtpgsNone, mtpgsNeedThreads, // the groups waiting for more threads to help mtpgsFinishing, // the groups waiting for its threads to finish mtpgsException // there was an exception => close asap ); TProcThreadGroup = class private FEndIndex: PtrInt; FException: Exception; FFirstRunningIndex: PtrInt; FFirstThread: TProcThread; FLastRunningIndex: PtrInt; FMaxThreads: PtrInt; FNext, FPrev: TProcThreadGroup; FPool: TProcThreadPool; FStarterItem: TMultiThreadProcItem; FStartIndex: PtrInt; FState: TMTPGroupState; FTaskData: Pointer; FTaskFrame: Pointer; FTaskMethod: TMTMethod; FTaskProcedure: TMTProcedure; FThreadCount: PtrInt; procedure AddToList(var First: TProcThreadGroup; ListType: TMTPGroupState); inline; procedure RemoveFromList(var First: TProcThreadGroup); inline; function NeedMoreThreads: boolean; inline; procedure IncreaseLastRunningIndex(Item: TMultiThreadProcItem); procedure AddThread(AThread: TProcThread); procedure RemoveThread(AThread: TProcThread); inline; procedure Run(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem); inline; procedure IndexComplete(Index: PtrInt); procedure WakeThreadsWaitingForIndex; function HasFinishedIndex(aStartIndex, aEndIndex: PtrInt): boolean; procedure EnterExceptionState(E: Exception); public constructor Create; destructor Destroy; override; property Pool: TProcThreadPool read FPool; property StartIndex: PtrInt read FStartIndex; property EndIndex: PtrInt read FEndIndex; property FirstRunningIndex: PtrInt read FFirstRunningIndex; // first started property LastRunningIndex: PtrInt read FLastRunningIndex; // last started property TaskData: Pointer read FTaskData; property TaskMethod: TMTMethod read FTaskMethod; property TaskProcedure: TMTProcedure read FTaskProcedure; property TaskFrame: Pointer read FTaskFrame; property MaxThreads: PtrInt read FMaxThreads; property StarterItem: TMultiThreadProcItem read FStarterItem; end; { TLightWeightThreadPool Group 0 are the inactive threads } { TProcThreadPool } TProcThreadPool = class private FMaxThreadCount: PtrInt; FThreadCount: PtrInt; FFirstInactiveThread: TProcThread; FFirstActiveThread: TProcThread; FFirstTerminatedThread: TProcThread; FFirstGroupNeedThreads: TProcThreadGroup; FFirstGroupFinishing: TProcThreadGroup; FCritSection: TRTLCriticalSection; FDestroying: boolean; procedure SetMaxThreadCount(const AValue: PtrInt); procedure CleanTerminatedThreads; procedure DoParallelIntern(const AMethod: TMTMethod; const AProc: TMTProcedure; const AFrame: Pointer; StartIndex, EndIndex: PtrInt; Data: Pointer = nil; MaxThreads: PtrInt = 0); public // for debugging only: the critical section is public: procedure EnterPoolCriticalSection; inline; procedure LeavePoolCriticalSection; inline; public constructor Create; destructor Destroy; override; procedure DoParallel(const AMethod: TMTMethod; StartIndex, EndIndex: PtrInt; Data: Pointer = nil; MaxThreads: PtrInt = 0); inline; procedure DoParallel(const AProc: TMTProcedure; StartIndex, EndIndex: PtrInt; Data: Pointer = nil; MaxThreads: PtrInt = 0); inline; // experimental procedure DoParallelLocalProc(const LocalProc: Pointer; StartIndex, EndIndex: PtrInt; Data: Pointer = nil; MaxThreads: PtrInt = 0); // do not make this inline! // utility functions for loops: procedure CalcBlockSize(LoopLength: PtrInt; out BlockCount, BlockSize: PtrInt; MinBlockSize: PtrInt = 0); inline; public property MaxThreadCount: PtrInt read FMaxThreadCount write SetMaxThreadCount; property ThreadCount: PtrInt read FThreadCount; end; var ProcThreadPool: TProcThreadPool = nil; threadvar CurrentThread: TThread; // TProcThread sets this, you can set this for your own TThreads descendants implementation { TMultiThreadProcItem } destructor TMultiThreadProcItem.Destroy; begin if fWaitForPool<>nil then begin RTLeventdestroy(fWaitForPool); fWaitForPool:=nil; end; inherited Destroy; end; function TMultiThreadProcItem.WaitForIndexRange( StartIndex, EndIndex: PtrInt): boolean; var aPool: TProcThreadPool; begin //WriteLn('TLightWeightThreadItem.WaitForIndexRange START Index='+IntToStr(Index)+' StartIndex='+IntToStr(StartIndex)+' EndIndex='+IntToStr(EndIndex)); if (EndIndex>=Index) then exit(false); if EndIndexnil then FNext[ListType].FPrev[ListType]:=Self; First:=Self; end; procedure TProcThread.RemoveFromList(var First: TProcThread; ListType: TMTPThreadList); begin if First=Self then First:=FNext[ListType]; if FNext[ListType]<>nil then FNext[ListType].FPrev[ListType]:=FPrev[ListType]; if FPrev[ListType]<>nil then FPrev[ListType].FNext[ListType]:=FNext[ListType]; FNext[ListType]:=nil; FPrev[ListType]:=nil; end; procedure TProcThread.Terminating(aPool: TProcThreadPool; E: Exception); begin aPool.EnterPoolCriticalSection; try // remove from group if Item.FGroup<>nil then begin // an exception occured Item.FGroup.EnterExceptionState(E); Item.FGroup.RemoveThread(Self); Item.FGroup:=nil; end; // move to pool's terminated threads case Item.FState of mtptsActive: RemoveFromList(aPool.FFirstActiveThread,mtptlPool); mtptsInactive: RemoveFromList(aPool.FFirstInactiveThread,mtptlPool); end; AddToList(aPool.FFirstTerminatedThread,mtptlPool); Item.FState:=mtptsTerminated; finally aPool.LeavePoolCriticalSection; end; end; constructor TProcThread.Create; begin inherited Create(true); fItem:=TMultiThreadProcItem.Create; fItem.fWaitForPool:=RTLEventCreate; fItem.FThread:=Self; end; destructor TProcThread.Destroy; begin FreeAndNil(FItem); inherited Destroy; end; procedure TProcThread.Execute; var aPool: TProcThreadPool; Group: TProcThreadGroup; ok: Boolean; E: Exception; begin MTProcs.CurrentThread:=Self; aPool:=Item.Group.Pool; ok:=false; try repeat // work Group:=Item.Group; Group.Run(Item.Index,Group.TaskData,Item); aPool.EnterPoolCriticalSection; try Group.IndexComplete(Item.Index); // find next work if Group.LastRunningIndexnil then begin // add to new group aPool.FFirstGroupNeedThreads.AddThread(Self); Group:=Item.Group; end else begin // mark inactive RemoveFromList(aPool.FFirstActiveThread,mtptlPool); AddToList(aPool.FFirstInactiveThread,mtptlPool); Item.FState:=mtptsInactive; RTLeventResetEvent(Item.fWaitForPool); end; end; finally aPool.LeavePoolCriticalSection; end; // wait for new work if Item.FState=mtptsInactive then RTLeventWaitFor(Item.fWaitForPool); until Item.Group=nil; ok:=true; except // stop the exception and store it E:=Exception(AcquireExceptionObject); Terminating(aPool,E); end; if ok then Terminating(aPool,nil); end; { TProcThreadGroup } procedure TProcThreadGroup.AddToList(var First: TProcThreadGroup; ListType: TMTPGroupState); begin FNext:=First; if FNext<>nil then FNext.FPrev:=Self; First:=Self; FState:=ListType; end; procedure TProcThreadGroup.RemoveFromList( var First: TProcThreadGroup); begin if First=Self then First:=FNext; if FNext<>nil then FNext.FPrev:=FPrev; if FPrev<>nil then FPrev.FNext:=FNext; FNext:=nil; FPrev:=nil; FState:=mtpgsNone; end; function TProcThreadGroup.NeedMoreThreads: boolean; begin Result:=(FLastRunningIndexmtpgsException); end; procedure TProcThreadGroup.IncreaseLastRunningIndex(Item: TMultiThreadProcItem); begin inc(FLastRunningIndex); Item.FIndex:=FLastRunningIndex; if NeedMoreThreads then exit; if FState=mtpgsNeedThreads then begin RemoveFromList(Pool.FFirstGroupNeedThreads); AddToList(Pool.FFirstGroupFinishing,mtpgsFinishing); end; end; procedure TProcThreadGroup.AddThread(AThread: TProcThread); begin AThread.Item.FGroup:=Self; AThread.AddToList(FFirstThread,mtptlGroup); inc(FThreadCount); IncreaseLastRunningIndex(AThread.Item); end; procedure TProcThreadGroup.RemoveThread(AThread: TProcThread); begin AThread.RemoveFromList(FFirstThread,mtptlGroup); dec(FThreadCount); end; procedure TProcThreadGroup.Run(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem); inline; begin if Assigned(FTaskFrame) then begin CallLocalProc(FTaskProcedure,FTaskFrame,Index,Data,Item) end else begin if Assigned(FTaskProcedure) then FTaskProcedure(Index,Data,Item) else FTaskMethod(Index,Data,Item) end; end; procedure TProcThreadGroup.IndexComplete(Index: PtrInt); var AThread: TProcThread; NewFirstRunningThread: PtrInt; begin // update FirstRunningIndex NewFirstRunningThread:=FStarterItem.Index; AThread:=FFirstThread; while AThread<>nil do begin if (NewFirstRunningThread>aThread.Item.Index) and (aThread.Item.Index<>Index) then NewFirstRunningThread:=aThread.Item.Index; aThread:=aThread.FNext[mtptlGroup]; end; FFirstRunningIndex:=NewFirstRunningThread; // wake up threads (Note: do this even if FFirstRunningIndex has not changed) WakeThreadsWaitingForIndex; end; procedure TProcThreadGroup.WakeThreadsWaitingForIndex; var aThread: TProcThread; begin if FState<>mtpgsException then begin // wake up waiting threads aThread:=FFirstThread; while aThread<>nil do begin if (aThread.Item.FState=mtptsWaitingForIndex) and HasFinishedIndex(aThread.Item.WaitingForIndexStart, aThread.Item.WaitingForIndexEnd) then begin // wake up the thread aThread.Item.FState:=mtptsActive; RTLeventSetEvent(aThread.Item.fWaitForPool); end; aThread:=aThread.FNext[mtptlGroup]; end; if (FStarterItem.FState=mtptsWaitingForIndex) and HasFinishedIndex(FStarterItem.WaitingForIndexStart,FStarterItem.WaitingForIndexEnd) then begin // wake up the starter thread of this group FStarterItem.FState:=mtptsActive; RTLeventSetEvent(FStarterItem.fWaitForPool); end; end else begin // end group: wake up waiting threads aThread:=FFirstThread; while aThread<>nil do begin if (aThread.Item.FState=mtptsWaitingForIndex) then begin // end group: wake up the thread aThread.Item.FState:=mtptsWaitingFailed; RTLeventSetEvent(aThread.Item.fWaitForPool); end; aThread:=aThread.FNext[mtptlGroup]; end; if (FStarterItem.FState=mtptsWaitingForIndex) then begin // end group: wake up the starter thread of this group FStarterItem.FState:=mtptsWaitingFailed; RTLeventSetEvent(FStarterItem.fWaitForPool); end; end; end; function TProcThreadGroup.HasFinishedIndex( aStartIndex, aEndIndex: PtrInt): boolean; var AThread: TProcThread; begin // test the finished range if FFirstRunningIndex>aEndIndex then exit(true); // test the unfinished range if FLastRunningIndexnil do begin if (AThread.Item.Index>=aStartIndex) and (AThread.Item.Index<=aEndIndex) then exit(false); AThread:=AThread.FNext[mtptlGroup]; end; if (FStarterItem.Index>=aStartIndex) and (FStarterItem.Index<=aEndIndex) then exit(false); Result:=true; end; procedure TProcThreadGroup.EnterExceptionState(E: Exception); begin if FState=mtpgsException then exit; case FState of mtpgsFinishing: RemoveFromList(Pool.FFirstGroupFinishing); mtpgsNeedThreads: RemoveFromList(Pool.FFirstGroupNeedThreads); end; FState:=mtpgsException; FException:=E; WakeThreadsWaitingForIndex; end; constructor TProcThreadGroup.Create; begin FStarterItem:=TMultiThreadProcItem.Create; FStarterItem.FGroup:=Self; end; destructor TProcThreadGroup.Destroy; begin FreeAndNil(FStarterItem); inherited Destroy; end; { TProcThreadPool } procedure TProcThreadPool.SetMaxThreadCount(const AValue: PtrInt); begin if FMaxThreadCount=AValue then exit; if AValue<1 then raise Exception.Create('TLightWeightThreadPool.SetMaxThreadCount'); FMaxThreadCount:=AValue; end; procedure TProcThreadPool.CleanTerminatedThreads; var AThread: TProcThread; begin while FFirstTerminatedThread<>nil do begin AThread:=FFirstTerminatedThread; AThread.RemoveFromList(FFirstTerminatedThread,mtptlPool); AThread.Free; end; end; constructor TProcThreadPool.Create; begin FMaxThreadCount:=GetSystemThreadCount; if FMaxThreadCount<1 then FMaxThreadCount:=1; InitCriticalSection(FCritSection); end; destructor TProcThreadPool.Destroy; procedure WakeWaitingStarterItems(Group: TProcThreadGroup); begin while Group<>nil do begin if Group.StarterItem.FState=mtptsWaitingForIndex then begin Group.StarterItem.FState:=mtptsWaitingFailed; RTLeventSetEvent(Group.StarterItem.fWaitForPool); end; Group:=Group.FNext; end; end; var AThread: TProcThread; begin FDestroying:=true; // wake up all waiting threads EnterPoolCriticalSection; try AThread:=FFirstActiveThread; while AThread<>nil do begin if aThread.Item.FState=mtptsWaitingForIndex then begin aThread.Item.FState:=mtptsWaitingFailed; RTLeventSetEvent(AThread.Item.fWaitForPool); end; AThread:=AThread.FNext[mtptlPool]; end; WakeWaitingStarterItems(FFirstGroupNeedThreads); WakeWaitingStarterItems(FFirstGroupFinishing); finally LeavePoolCriticalSection; end; // wait for all active threads to become inactive while FFirstActiveThread<>nil do Sleep(10); // wake up all inactive threads (without new work they will terminate) EnterPoolCriticalSection; try AThread:=FFirstInactiveThread; while AThread<>nil do begin RTLeventSetEvent(AThread.Item.fWaitForPool); AThread:=AThread.FNext[mtptlPool]; end; finally LeavePoolCriticalSection; end; // wait for all threads to terminate while FFirstInactiveThread<>nil do Sleep(10); // free threads CleanTerminatedThreads; DoneCriticalsection(FCritSection); inherited Destroy; end; procedure TProcThreadPool.EnterPoolCriticalSection; begin EnterCriticalsection(FCritSection); end; procedure TProcThreadPool.LeavePoolCriticalSection; begin LeaveCriticalsection(FCritSection); end; procedure TProcThreadPool.DoParallel(const AMethod: TMTMethod; StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt); begin if not Assigned(AMethod) then exit; DoParallelIntern(AMethod,nil,nil,StartIndex,EndIndex,Data,MaxThreads); end; procedure TProcThreadPool.DoParallel(const AProc: TMTProcedure; StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt); begin if not Assigned(AProc) then exit; DoParallelIntern(nil,AProc,nil,StartIndex,EndIndex,Data,MaxThreads); end; procedure TProcThreadPool.DoParallelLocalProc(const LocalProc: Pointer; StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt); var Frame: Pointer; begin if not Assigned(LocalProc) then exit; Frame:=get_caller_frame(get_frame); DoParallelIntern(nil,TMTProcedure(LocalProc),Frame,StartIndex,EndIndex, Data,MaxThreads); end; procedure TProcThreadPool.CalcBlockSize(LoopLength: PtrInt; out BlockCount, BlockSize: PtrInt; MinBlockSize: PtrInt); begin if LoopLength<=0 then begin BlockCount:=0; BlockSize:=1; exit; end; // split work into equally sized blocks BlockCount:=ProcThreadPool.MaxThreadCount; BlockSize:=(LoopLength div BlockCount); if (BlockSizeEndIndex) then exit; // nothing to do if FDestroying then raise Exception.Create('Pool destroyed'); if (MaxThreads>MaxThreadCount) or (MaxThreads<=0) then MaxThreads:=MaxThreadCount; if (StartIndex=EndIndex) or (MaxThreads<=1) then begin // single threaded Item:=TMultiThreadProcItem.Create; try for Index:=StartIndex to EndIndex do begin Item.FIndex:=Index; if Assigned(AFrame) then begin CallLocalProc(AProc,AFrame,Index,Data,Item) end else begin if Assigned(AProc) then AProc(Index,Data,Item) else AMethod(Index,Data,Item) end; end; finally Item.Free; end; exit; end; // create a new group Group:=TProcThreadGroup.Create; Group.FPool:=Self; Group.FTaskData:=Data; Group.FTaskMethod:=AMethod; Group.FTaskProcedure:=AProc; Group.FTaskFrame:=AFrame; Group.FStartIndex:=StartIndex; Group.FEndIndex:=EndIndex; Group.FFirstRunningIndex:=StartIndex; Group.FLastRunningIndex:=StartIndex; Group.FMaxThreads:=MaxThreads; Group.FThreadCount:=1; Group.FStarterItem.FState:=mtptsActive; Group.FStarterItem.FIndex:=StartIndex; HelperThreadException:=nil; try // start threads EnterPoolCriticalSection; try Group.AddToList(FFirstGroupNeedThreads,mtpgsNeedThreads); while Group.NeedMoreThreads do begin AThread:=FFirstInactiveThread; NewThread:=false; if AThread<>nil then begin AThread.RemoveFromList(FFirstInactiveThread,mtptlPool); end else if FThreadCountmtpgsException) then begin inc(Group.FLastRunningIndex); Index:=Group.FLastRunningIndex; end else begin Index:=StartIndex; end; finally LeavePoolCriticalSection; end; until Index=StartIndex; finally // wait for Group to finish if Group.FFirstThread<>nil then begin EnterPoolCriticalSection; try Group.FStarterItem.FState:=mtptsInactive; Group.FStarterItem.fIndex:=EndIndex;// needed for Group.HasFinishedIndex // wake threads waiting for starter thread to finish if Group.FStarterItem.FState<>mtptsInactive then Group.EnterExceptionState(nil) else Group.WakeThreadsWaitingForIndex; finally LeavePoolCriticalSection; end; // waiting with exponential spin lock Index:=0; while Group.FFirstThread<>nil do begin sleep(Index); Index:=Index*2+1; if Index>30 then Index:=30; end; end; // remove group from pool EnterPoolCriticalSection; try case Group.FState of mtpgsNeedThreads: Group.RemoveFromList(FFirstGroupNeedThreads); mtpgsFinishing: Group.RemoveFromList(FFirstGroupFinishing); end; finally LeavePoolCriticalSection; end; HelperThreadException:=Group.FException; Group.Free; // free terminated threads (terminated, because of exceptions) CleanTerminatedThreads; end; // if the exception occured in a helper thread raise it now if HelperThreadException<>nil then raise HelperThreadException; end; initialization ProcThreadPool:=TProcThreadPool.Create; CurrentThread:=nil; finalization ProcThreadPool.Free; ProcThreadPool:=nil; end.