added mtprocs

git-svn-id: trunk@38214 -
This commit is contained in:
mattias 2012-08-09 17:02:12 +00:00
parent bd89c5bdc7
commit 96d9a3c292
15 changed files with 2024 additions and 0 deletions

14
.gitattributes vendored
View File

@ -2036,6 +2036,19 @@ components/mrumenu/mrumanager.pp svneol=native#text/plain
components/mrumenu/reglazmru.pp svneol=native#text/plain
components/mrumenu/regmru.lrs svneol=native#text/plain
components/mrumenu/tmrumenumanager.png -text svneol=unset#image/png
components/multithreadprocs/examples/parallelloop1.lpi svneol=native#text/plain
components/multithreadprocs/examples/parallelloop1.lpr svneol=native#text/plain
components/multithreadprocs/examples/recursivemtp1.lpi svneol=native#text/plain
components/multithreadprocs/examples/recursivemtp1.lpr svneol=native#text/plain
components/multithreadprocs/examples/simplemtp1.lpi svneol=native#text/plain
components/multithreadprocs/examples/simplemtp1.lpr svneol=native#text/plain
components/multithreadprocs/examples/testmtp1.lpi svneol=native#text/plain
components/multithreadprocs/examples/testmtp1.lpr svneol=native#text/plain
components/multithreadprocs/mtpcpu.pas svneol=native#text/plain
components/multithreadprocs/mtprocs.pas svneol=native#text/plain
components/multithreadprocs/mtputils.pas svneol=native#text/plain
components/multithreadprocs/multithreadprocslaz.lpk svneol=native#text/plain
components/multithreadprocs/multithreadprocslaz.pas svneol=native#text/plain
components/opengl/example/imgui.lpi svneol=native#text/plain
components/opengl/example/imgui.pas svneol=native#text/pascal
components/opengl/example/imgui.res -text
@ -6371,6 +6384,7 @@ packager/globallinks/lrcodereport-1.0.lpl svneol=native#text/plain
packager/globallinks/macosfiles-0.lpl svneol=native#text/plain
packager/globallinks/memdslaz-1.2.1.lpl svneol=native#text/plain
packager/globallinks/messagecomposerpkg-0.lpl svneol=native#text/plain
packager/globallinks/multithreadprocslaz-1.2.1.lpl svneol=native#text/plain
packager/globallinks/notlcldesigner-0.lpl svneol=native#text/plain
packager/globallinks/pochecker-1.lpl svneol=native#text/plain
packager/globallinks/prettymessages-0.lpl svneol=native#text/plain

View File

@ -0,0 +1,50 @@
<?xml version="1.0"?>
<CONFIG>
<ProjectOptions>
<Version Value="7"/>
<General>
<Flags>
<LRSInOutputDirectory Value="False"/>
</Flags>
<SessionStorage Value="InProjectDir"/>
<MainUnit Value="0"/>
<TargetFileExt Value=""/>
<Title Value="parallelloop1"/>
</General>
<VersionInfo>
<ProjectVersion Value=""/>
</VersionInfo>
<PublishOptions>
<Version Value="2"/>
<IgnoreBinaries Value="False"/>
<IncludeFileFilter Value="*.(pas|pp|inc|lfm|lpr|lrs|lpi|lpk|sh|xml)"/>
<ExcludeFileFilter Value="*.(bak|ppu|ppw|o|so);*~;backup"/>
</PublishOptions>
<RunParams>
<local>
<FormatVersion Value="1"/>
<LaunchingApplication PathPlusParams="/usr/X11R6/bin/xterm -T 'Lazarus Run Output' -e $(LazarusDir)/tools/runwait.sh $(TargetCmdLine)"/>
</local>
</RunParams>
<RequiredPackages Count="1">
<Item1>
<PackageName Value="MultiThreadProcsLaz"/>
<MinVersion Valid="True"/>
<DefaultFilename Value="../multithreadprocslaz.lpk"/>
</Item1>
</RequiredPackages>
<Units Count="1">
<Unit0>
<Filename Value="parallelloop1.lpr"/>
<IsPartOfProject Value="True"/>
<UnitName Value="ParallelLoop1"/>
</Unit0>
</Units>
</ProjectOptions>
<CompilerOptions>
<Version Value="8"/>
<Other>
<CompilerPath Value="$(CompPath)"/>
</Other>
</CompilerOptions>
</CONFIG>

View File

@ -0,0 +1,107 @@
{ Example for a parallel loop with MTProcs.
Copyright (C) 2009 Mattias Gaertner mattias@freepascal.org
This library is free software; you can redistribute it and/or modify it
under the terms of the GNU Library General Public License as published by
the Free Software Foundation; either version 2 of the License, or (at your
option) any later version with the following modification:
As a special exception, the copyright holders of this library give you
permission to link this library with independent modules to produce an
executable, regardless of the license terms of these independent modules,and
to copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the terms
and conditions of the license of that module. An independent module is a
module which is not derived from or based on this library. If you modify
this library, you may extend this exception to your version of the library,
but you are not obligated to do so. If you do not wish to do so, delete this
exception statement from your version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License
for more details.
You should have received a copy of the GNU Library General Public License
along with this library; if not, write to the Free Software Foundation,
Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
}
program ParallelLoop1;
{$mode objfpc}{$H+}
uses
{$IFDEF UNIX}
cthreads, cmem,
{$ENDIF}
Classes, SysUtils, MTProcs;
type
TFindBestData = record
List: TList;
Value: Pointer;
BlockCount: integer;
Results: array of integer;
end;
PFindBestData = ^TFindBestData;
procedure FindBestParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
var
i: integer;
begin
with PFindBestData(Data)^ do begin
Results[Index]:=-1;
i:=Index;
while i<List.Count-1 do begin
if List[i]=Value then // hier wuerde die teure Vergleichsoperation stehen
Results[Index]:=i;
inc(i,BlockCount);
end;
end;
end;
function FindBest(aList: TList; aValue: Pointer): integer;
var
Index: integer;
Data: TFindBestData;
begin
with Data do begin
List:=aList;
Value:=aValue;
BlockCount:=ProcThreadPool.MaxThreadCount;
SetLength(Results,BlockCount);
ProcThreadPool.DoParallel(@FindBestParallel,0,BlockCount-1,@Data);
// Ergebnisse zusammenfassen
Result:=-1;
for Index:=0 to BlockCount-1 do
if Results[Index]>=0 then
Result:=Results[Index];
end;
end;
function FindBest1(List: TList; Value: Pointer): integer;
var
i: integer;
begin
Result:=-1;
i:=0;
while i<List.Count do begin
if List[i]=Value then // hier wuerde die teure Vergleichsoperation stehen
Result:=i;
inc(i);
end;
end;
var
List: TList;
i: Integer;
begin
List:=TList.Create;
for i:=0 to 100000000 do
List.Add(Pointer(i));
i:=FindBest(List,Pointer(9999));
//i:=FindBest1(List,Pointer(9999));
writeln('i=',i);
end.

View File

@ -0,0 +1,50 @@
<?xml version="1.0"?>
<CONFIG>
<ProjectOptions>
<Version Value="7"/>
<General>
<Flags>
<LRSInOutputDirectory Value="False"/>
</Flags>
<SessionStorage Value="InProjectDir"/>
<MainUnit Value="0"/>
<TargetFileExt Value=""/>
<Title Value="recursivemtp1"/>
</General>
<VersionInfo>
<ProjectVersion Value=""/>
</VersionInfo>
<PublishOptions>
<Version Value="2"/>
<IgnoreBinaries Value="False"/>
<IncludeFileFilter Value="*.(pas|pp|inc|lfm|lpr|lrs|lpi|lpk|sh|xml)"/>
<ExcludeFileFilter Value="*.(bak|ppu|ppw|o|so);*~;backup"/>
</PublishOptions>
<RunParams>
<local>
<FormatVersion Value="1"/>
<LaunchingApplication PathPlusParams="/usr/X11R6/bin/xterm -T 'Lazarus Run Output' -e $(LazarusDir)/tools/runwait.sh $(TargetCmdLine)"/>
</local>
</RunParams>
<RequiredPackages Count="1">
<Item1>
<PackageName Value="MultiThreadProcsLaz"/>
<MinVersion Valid="True"/>
<DefaultFilename Value="../multithreadprocslaz.lpk"/>
</Item1>
</RequiredPackages>
<Units Count="1">
<Unit0>
<Filename Value="recursivemtp1.lpr"/>
<IsPartOfProject Value="True"/>
<UnitName Value="RecursiveMTP1"/>
</Unit0>
</Units>
</ProjectOptions>
<CompilerOptions>
<Version Value="8"/>
<Other>
<CompilerPath Value="$(CompPath)"/>
</Other>
</CompilerOptions>
</CONFIG>

View File

@ -0,0 +1,49 @@
program RecursiveMTP1;
{$mode objfpc}{$H+}
uses
{$IFDEF UNIX}
cthreads, cmem,
{$ENDIF}
MTProcs;
type
TArrayOfInteger = array of integer;
var
Items: TArrayOfInteger;
type
TFindMaximumParallelData = record
Items: TArrayOfInteger;
Left, Middle, Right: integer;
LeftMaxIndex, RightMaxIndex: integer;
end;
PFindMaximumParallelData = ^TFindMaximumParallelData;
procedure FindMaximumParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
var
Params: PFindMaximumParallelData absolute Data;
LeftParams, RightParams: TFindMaximumParallelData;
begin
if Params^.Left+1000>Params^.Right then begin
// compute the maximum of the few remaining items
Params^.LeftMaxIndex:=Params^.Items[Params^.Left];
for i:=Params^.Left+1 to Params^.Right do
if Params^.Items[i]>Params^.LeftMaxIndex then
end else begin
end;
end;
function FindMaximumIndex(Items: TArrayOfInteger): integer;
begin
end;
begin
SetLength(Items,10000000);
for i:=0 to length(Items)-1 do Items[i]:=Random(1000);
ProcThreadPool.DoParallel(@DoSomethingParallel,1,5,nil); // address, startindex, endindex, optional data
end.

View File

@ -0,0 +1,51 @@
<?xml version="1.0"?>
<CONFIG>
<ProjectOptions>
<PathDelim Value="/"/>
<Version Value="7"/>
<General>
<Flags>
<LRSInOutputDirectory Value="False"/>
</Flags>
<SessionStorage Value="InProjectDir"/>
<MainUnit Value="0"/>
<TargetFileExt Value=""/>
<Title Value="simplemtp1"/>
</General>
<VersionInfo>
<ProjectVersion Value=""/>
</VersionInfo>
<PublishOptions>
<Version Value="2"/>
<IgnoreBinaries Value="False"/>
<IncludeFileFilter Value="*.(pas|pp|inc|lfm|lpr|lrs|lpi|lpk|sh|xml)"/>
<ExcludeFileFilter Value="*.(bak|ppu|ppw|o|so);*~;backup"/>
</PublishOptions>
<RunParams>
<local>
<FormatVersion Value="1"/>
<LaunchingApplication PathPlusParams="/usr/X11R6/bin/xterm -T 'Lazarus Run Output' -e $(LazarusDir)/tools/runwait.sh $(TargetCmdLine)"/>
</local>
</RunParams>
<RequiredPackages Count="1">
<Item1>
<PackageName Value="MultiThreadProcsLaz"/>
<MinVersion Valid="True"/>
<DefaultFilename Value="../multithreadprocslaz.lpk"/>
</Item1>
</RequiredPackages>
<Units Count="1">
<Unit0>
<Filename Value="simplemtp1.lpr"/>
<IsPartOfProject Value="True"/>
<UnitName Value="SimpleMTP1"/>
</Unit0>
</Units>
</ProjectOptions>
<CompilerOptions>
<Version Value="8"/>
<Other>
<CompilerPath Value="$(CompPath)"/>
</Other>
</CompilerOptions>
</CONFIG>

View File

@ -0,0 +1,23 @@
program SimpleMTP1;
{$mode objfpc}{$H+}
uses
{$IFDEF UNIX}
cthreads, cmem,
{$ENDIF}
MTProcs;
// a simple parallel procedure
procedure DoSomethingParallel(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem);
var
i: Integer;
begin
writeln(Index);
for i:=1 to Index*1000000 do ; // do some work
end;
begin
ProcThreadPool.DoParallel(@DoSomethingParallel,1,5,nil); // address, startindex, endindex, optional data
end.

View File

@ -0,0 +1,46 @@
<?xml version="1.0"?>
<CONFIG>
<ProjectOptions>
<PathDelim Value="/"/>
<Version Value="6"/>
<General>
<SessionStorage Value="InProjectDir"/>
<MainUnit Value="0"/>
<TargetFileExt Value=""/>
</General>
<VersionInfo>
<ProjectVersion Value=""/>
</VersionInfo>
<PublishOptions>
<Version Value="2"/>
<IgnoreBinaries Value="False"/>
<IncludeFileFilter Value="*.(pas|pp|inc|lfm|lpr|lrs|lpi|lpk|sh|xml)"/>
<ExcludeFileFilter Value="*.(bak|ppu|ppw|o|so);*~;backup"/>
</PublishOptions>
<RunParams>
<local>
<FormatVersion Value="1"/>
<LaunchingApplication PathPlusParams="/usr/X11R6/bin/xterm -T 'Lazarus Run Output' -e $(LazarusDir)/tools/runwait.sh $(TargetCmdLine)"/>
</local>
</RunParams>
<RequiredPackages Count="1">
<Item1>
<PackageName Value="MultiThreadProcsLaz"/>
<MinVersion Valid="True"/>
<DefaultFilename Value="../multithreadprocslaz.lpk"/>
</Item1>
</RequiredPackages>
<Units Count="1">
<Unit0>
<Filename Value="testmtp1.lpr"/>
<IsPartOfProject Value="True"/>
</Unit0>
</Units>
</ProjectOptions>
<CompilerOptions>
<Version Value="8"/>
<Other>
<CompilerPath Value="$(CompPath)"/>
</Other>
</CompilerOptions>
</CONFIG>

View File

@ -0,0 +1,367 @@
program TestMTP1;
{$mode objfpc}{$H+}
uses
{$IFDEF UNIX}
cthreads, cmem,
{$ENDIF}
Math, SysUtils, Classes, MTProcs, MTPUtils, MultiThreadProcsLaz;
type
{ TTestItem }
TTestItem = class
private
FIndex: int64;
public
property Index: int64 read FIndex;
constructor Create(NewIndex: int64);
end;
{ TTests }
TTests = class
public
procedure Work(Seconds: integer);
// RTLeventSetEvent, RTLeventWaitFor
procedure TestRTLevent_Set_WaitFor;
// single thread test
procedure TestSingleThread;
procedure MTPLoop_TestSingleThread(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem);
// two threads test: run once
procedure TestTwoThreads1;
procedure MTPLoop_TestTwoThreads1(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem);
// 0 runs two seconds,
// 1 runs a second then waits for 0 then runs a second
// 2 runs a second then waits for 1
// 3 waits for 0
// 4 waits for 1
// 5 waits for 2
procedure TestMTPWaitForIndex;
procedure MTPLoop_TestMTPWaitForIndex(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem);
// two threads test: various run times
procedure TestMTPTwoThreads2;
procedure MTPLoop_TestTwoThreads2(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem);
// test exception in starter thread
procedure TestMTPExceptionInStarterThread;
procedure MTPLoop_TestExceptionInStarterThread(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem);
// test exception in helper thread
procedure TestMTPExceptionInHelperThread;
procedure MTPLoop_TestExceptionInHelperThread(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem);
// test parallel sort
procedure TestMTPSort;
procedure MTPLoop_TestDoubleMTPSort(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem);
end;
{ TTestItem }
constructor TTestItem.Create(NewIndex: int64);
begin
FIndex:=NewIndex;
end;
{ TTests }
procedure TTests.Work(Seconds: integer);
var
Start: TDateTime;
begin
Start:=Now;
while (Now-Start)*86400<Seconds do if GetCurrentDir='' then ;
end;
procedure TTests.TestRTLevent_Set_WaitFor;
var
e: PRTLEvent;
begin
e:=RTLEventCreate;
RTLeventSetEvent(e);
RTLeventWaitFor(e);
RTLeventdestroy(e);
end;
procedure TTests.TestSingleThread;
begin
ProcThreadPool.DoParallel(@MTPLoop_TestSingleThread,1,3,nil,1);
end;
procedure TTests.MTPLoop_TestSingleThread(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem);
begin
writeln('TTests.MTPLoop_TestSingleThread Index=',Index);
end;
procedure TTests.TestTwoThreads1;
begin
WriteLn('TTests.TestTwoThreads1 START');
ProcThreadPool.DoParallel(@MTPLoop_TestTwoThreads1,1,2,nil,2);
WriteLn('TTests.TestTwoThreads1 END');
end;
procedure TTests.MTPLoop_TestTwoThreads1(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem);
var
i: Integer;
begin
for i:=1 to 3 do begin
WriteLn('TTests.MTPLoop_TestTwoThreads1 Index=',Index,' ',i);
Work(1);
end;
end;
procedure TTests.TestMTPWaitForIndex;
var
IndexStates: PInteger;
begin
ProcThreadPool.MaxThreadCount:=8;
IndexStates:=nil;
GetMem(IndexStates,SizeOf(Integer)*10);
FillByte(IndexStates^,SizeOf(Integer)*10,0);
WriteLn('TTests.TestMTPWaitForIndex START');
ProcThreadPool.DoParallel(@MTPLoop_TestMTPWaitForIndex,0,5,IndexStates);
FreeMem(IndexStates);
WriteLn('TTests.TestMTPWaitForIndex END');
end;
procedure TTests.MTPLoop_TestMTPWaitForIndex(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem);
// 0 runs two seconds,
// 1 runs a second then waits for 0 then runs a second
// 2 runs a second then waits for 1
// 3 waits for 0
// 4 waits for 1
// 5 waits for 2
procedure WaitFor(OtherIndex: PtrInt);
begin
WriteLn('TTests.MTPLoop_TestMTPWaitForIndex Index='+IntToStr(Index)+' waiting for '+IntToStr(OtherIndex)+' ...');
Item.WaitForIndex(OtherIndex);
WriteLn('TTests.MTPLoop_TestMTPWaitForIndex Index='+IntToStr(Index)+' waited for '+IntToStr(OtherIndex)+'. working ...');
if PInteger(Data)[OtherIndex]<>2 then begin
WriteLn('TTests.MTPLoop_TestMTPWaitForIndex Index='+IntToStr(Index)+' ERROR: waited for '+IntToStr(OtherIndex)+' failed: OtherState='+IntToStr(PInteger(Data)[OtherIndex]));
end;
end;
begin
WriteLn('TTests.MTPLoop_TestMTPWaitForIndex Index='+IntToStr(Index)+' START');
if PInteger(Data)[Index]<>0 then begin
WriteLn('TTests.MTPLoop_TestMTPWaitForIndex Index='+IntToStr(Index)+' ERROR: IndexState='+IntToStr(PInteger(Data)[Index]));
end;
PInteger(Data)[Index]:=1;
case Index of
0: Work(2);
1:begin
Work(1);
WaitFor(0);
Work(1);
end;
2:begin
Work(1);
WaitFor(1);
end;
3:begin
WaitFor(0);
end;
4:begin
WaitFor(1);
end;
5:begin
WaitFor(2);
end;
end;
WriteLn('TTests.MTPLoop_TestMTPWaitForIndex Index='+IntToStr(Index)+' END');
PInteger(Data)[Index]:=2;
end;
procedure TTests.TestMTPTwoThreads2;
begin
WriteLn('TTests.TestMTPTwoThreads1 START');
ProcThreadPool.DoParallel(@MTPLoop_TestTwoThreads2,1,6,nil,2);
WriteLn('TTests.TestMTPTwoThreads1 END');
end;
procedure TTests.MTPLoop_TestTwoThreads2(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem);
var
i: Integer;
begin
for i:=1 to (Index mod 3)+1 do begin
WriteLn('TTests.MTPLoop_TestTwoThreads1 Index=',Index,' i=',i,' ID=',PtrUint(GetThreadID));
Work(1);
end;
end;
type
TMyException = class(Exception);
procedure TTests.TestMTPExceptionInStarterThread;
var
IndexStates: PInteger;
begin
WriteLn('TTests.TestMTPExceptionInStarterThread START');
ProcThreadPool.MaxThreadCount:=8;
IndexStates:=nil;
GetMem(IndexStates,SizeOf(Integer)*10);
FillByte(IndexStates^,SizeOf(Integer)*10,0);
try
ProcThreadPool.DoParallel(@MTPLoop_TestExceptionInStarterThread,1,3,IndexStates,2);
except
on E: Exception do begin
WriteLn('TTests.TestMTPExceptionInHelperThread E.ClassName=',E.ClassName,' E.Message=',E.Message);
end;
end;
FreeMem(IndexStates);
WriteLn('TTests.TestMTPExceptionInStarterThread END');
end;
procedure TTests.MTPLoop_TestExceptionInStarterThread(Index: PtrInt;
Data: Pointer; Item: TMultiThreadProcItem);
begin
WriteLn('TTests.MTPLoop_TestExceptionInStarterThread START Index='+IntToStr(Index));
if PInteger(Data)[Index]<>0 then
WriteLn('TTests.MTPLoop_TestExceptionInStarterThread Index='+IntToStr(Index)+' ERROR: IndexState='+IntToStr(PInteger(Data)[Index]));
PInteger(Data)[Index]:=1;
case Index of
1:
begin
// Main Thread
Work(1);
WriteLn('TTests.MTPLoop_TestExceptionInStarterThread raising exception in Index='+IntToStr(Index)+' ...');
raise Exception.Create('Exception in starter thread');
end;
else
Work(Index);
end;
PInteger(Data)[Index]:=2;
WriteLn('TTests.MTPLoop_TestExceptionInStarterThread END Index='+IntToStr(Index));
end;
procedure TTests.TestMTPExceptionInHelperThread;
var
IndexStates: PInteger;
begin
WriteLn('TTests.TestMTPExceptionInHelperThread START');
ProcThreadPool.MaxThreadCount:=8;
IndexStates:=nil;
GetMem(IndexStates,SizeOf(Integer)*10);
FillByte(IndexStates^,SizeOf(Integer)*10,0);
try
ProcThreadPool.DoParallel(@MTPLoop_TestExceptionInHelperThread,1,3,IndexStates,2);
except
on E: Exception do begin
WriteLn('TTests.TestMTPExceptionInHelperThread E.ClassName=',E.ClassName,' E.Message=',E.Message);
end;
end;
FreeMem(IndexStates);
WriteLn('TTests.TestMTPExceptionInHelperThread END');
end;
procedure TTests.MTPLoop_TestExceptionInHelperThread(Index: PtrInt;
Data: Pointer; Item: TMultiThreadProcItem);
begin
WriteLn('TTests.MTPLoop_TestExceptionInHelperThread START Index='+IntToStr(Index));
if PInteger(Data)[Index]<>0 then
WriteLn('TTests.MTPLoop_TestExceptionInHelperThread Index='+IntToStr(Index)+' ERROR: IndexState='+IntToStr(PInteger(Data)[Index]));
PInteger(Data)[Index]:=1;
case Index of
2:
begin
// Helper Thread 2
Work(1);
WriteLn('TTests.MTPLoop_TestExceptionInHelperThread raising exception in Index='+IntToStr(Index)+' ...');
raise TMyException.Create('Exception in helper thread');
end;
else
Work(Index+1);
end;
PInteger(Data)[Index]:=2;
WriteLn('TTests.MTPLoop_TestExceptionInHelperThread END Index='+IntToStr(Index));
end;
function CompareTestItems(Data1, Data2: Pointer): integer;
begin
if TTestItem(Data1).Index>TTestItem(Data2).Index then
Result:=1
else if TTestItem(Data1).Index<TTestItem(Data2).Index then
Result:=-1
else
Result:=0;
end;
procedure TTests.TestMTPSort;
var
OuterLoop: Integer;
InnerLoop: Integer;
begin
OuterLoop:=1;
InnerLoop:=0;
if Paramcount=1 then begin
InnerLoop:=StrToInt(ParamStr(1));
end else if Paramcount=2 then begin
OuterLoop:=StrToInt(ParamStr(1));
InnerLoop:=StrToInt(ParamStr(2));
end;
writeln('TTests.TestMTPSort Running ',OuterLoop,'x',InnerLoop);
ProcThreadPool.DoParallel(@MTPLoop_TestDoubleMTPSort,1,OuterLoop,@InnerLoop);
end;
procedure TTests.MTPLoop_TestDoubleMTPSort(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem);
var
i: Integer;
List: TFPList;
t: double;
begin
// create an unsorted list of values
List:=TFPList.Create;
for i:=1 to 10000000 do List.Add(TTestItem.Create(Random(99999999999)));
//QuickSort(List,0,List.Count-1,@AnsiCompareText);
t:=Now;
ParallelSortFPList(List,@CompareTestItems,PInteger(Data)^);
t:=Now-t;
writeln('TTests.TestMTPSort ',t*86400);
// check
sleep(1);
for i:=0 to List.Count-2 do
if CompareTestItems(List[i],List[i+1])>0 then raise Exception.Create('not sorted');
for i:=0 to List.Count-1 do
TObject(List[i]).Free;
List.Free;
end;
var
Tests: TTests;
begin
writeln('threads=',ProcThreadPool.MaxThreadCount);
ProcThreadPool.MaxThreadCount:=8;
Tests:=TTests.Create;
//Tests.Test1;
//Tests.Test2;
//Tests.TestTwoThreads2;
//Tests.TestRTLevent_Set_WaitFor;
//Tests.TestMTPWaitForIndex;
//Tests.TestMTPExceptionInStarterThread;
Tests.TestMTPExceptionInHelperThread;
//Tests.TestMTPSort;
Tests.Free;
end.

View File

@ -0,0 +1,104 @@
{ System depending code for light weight threads.
This file is part of the Free Pascal run time library.
Copyright (C) 2008 Mattias Gaertner mattias@freepascal.org
See the file COPYING.FPC, included in this distribution,
for details about the copyright.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
**********************************************************************}
unit MTPCPU;
{$mode objfpc}{$H+}
{$inline on}
interface
{$IF defined(windows)}
uses Windows;
{$ELSEIF defined(freebsd) or defined(darwin)}
uses ctypes, sysctl;
{$ELSEIF defined(linux)}
{$linklib c}
uses ctypes;
{$ENDIF}
function GetSystemThreadCount: integer;
procedure CallLocalProc(AProc, Frame: Pointer; Param1: PtrInt;
Param2, Param3: Pointer); inline;
implementation
{$IFDEF Linux}
const _SC_NPROCESSORS_ONLN = 83;
function sysconf(i: cint): clong; cdecl; external name 'sysconf';
{$ENDIF}
function GetSystemThreadCount: integer;
// returns a good default for the number of threads on this system
{$IF defined(windows)}
//returns total number of processors available to system including logical hyperthreaded processors
var
i: Integer;
ProcessAffinityMask, SystemAffinityMask: DWORD;
Mask: DWORD;
SystemInfo: SYSTEM_INFO;
begin
if GetProcessAffinityMask(GetCurrentProcess, ProcessAffinityMask, SystemAffinityMask)
then begin
Result := 0;
for i := 0 to 31 do begin
Mask := DWord(1) shl i;
if (ProcessAffinityMask and Mask)<>0 then
inc(Result);
end;
end else begin
//can't get the affinity mask so we just report the total number of processors
GetSystemInfo(SystemInfo);
Result := SystemInfo.dwNumberOfProcessors;
end;
end;
{$ELSEIF defined(UNTESTEDsolaris)}
begin
t = sysconf(_SC_NPROC_ONLN);
end;
{$ELSEIF defined(freebsd) or defined(darwin)}
var
mib: array[0..1] of cint;
len: cint;
t: cint;
begin
mib[0] := CTL_HW;
mib[1] := HW_NCPU;
len := sizeof(t);
fpsysctl(pchar(@mib), 2, @t, @len, Nil, 0);
Result:=t;
end;
{$ELSEIF defined(linux)}
begin
Result:=sysconf(_SC_NPROCESSORS_ONLN);
end;
{$ELSE}
begin
Result:=1;
end;
{$ENDIF}
procedure CallLocalProc(AProc, Frame: Pointer; Param1: PtrInt;
Param2, Param3: Pointer); inline;
type
PointerLocal = procedure(_EBP: Pointer; Param1: PtrInt;
Param2, Param3: Pointer);
begin
PointerLocal(AProc)(Frame, Param1, Param2, Param3);
end;
end.

View File

@ -0,0 +1,879 @@
{ Unit for light weight threads.
This file is part of the Free Pascal run time library.
Copyright (C) 2008 Mattias Gaertner mattias@freepascal.org
See the file COPYING.FPC, included in this distribution,
for details about the copyright.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
**********************************************************************}
{
Abstract:
Light weight threads.
This unit provides methods to easily run a procedure/method with several
threads at once.
}
unit MTProcs;
{$mode objfpc}{$H+}
{$inline on}
interface
uses
Classes, SysUtils, MTPCPU;
type
TProcThreadGroup = class;
TProcThreadPool = class;
TProcThread = class;
{ TMultiThreadProcItem }
TMTPThreadState = (
mtptsNone,
mtptsActive,
mtptsWaitingForIndex,
mtptsWaitingFailed,
mtptsInactive,
mtptsTerminated
);
TMultiThreadProcItem = class
private
FGroup: TProcThreadGroup;
FIndex: PtrInt;
FThread: TProcThread;
FWaitingForIndexEnd: PtrInt;
FWaitingForIndexStart: PtrInt;
fWaitForPool: PRTLEvent;
FState: TMTPThreadState;
public
destructor Destroy; override;
function WaitForIndexRange(StartIndex, EndIndex: PtrInt): boolean;
function WaitForIndex(Index: PtrInt): boolean; inline;
procedure CalcBlock(Index, BlockSize, LoopLength: PtrInt;
out BlockStart, BlockEnd: PtrInt); inline;
property Index: PtrInt read FIndex;
property Group: TProcThreadGroup read FGroup;
property WaitingForIndexStart: PtrInt read FWaitingForIndexStart;
property WaitingForIndexEnd: PtrInt read FWaitingForIndexEnd;
property Thread: TProcThread read FThread;
end;
{ TProcThread }
TMTPThreadList = (
mtptlPool,
mtptlGroup
);
TProcThread = class(TThread)
private
FItem: TMultiThreadProcItem;
FNext, FPrev: array[TMTPThreadList] of TProcThread;
procedure AddToList(var First: TProcThread; ListType: TMTPThreadList); inline;
procedure RemoveFromList(var First: TProcThread; ListType: TMTPThreadList); inline;
procedure Terminating(aPool: TProcThreadPool; E: Exception);
public
constructor Create;
destructor Destroy; override;
procedure Execute; override;
property Item: TMultiThreadProcItem read FItem;
end;
TMTMethod = procedure(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem) of object;
TMTProcedure = procedure(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem);
{ TProcThreadGroup
Each task creates a new group of threads.
A group can either need more threads or it has finished and waits for its
threads to end.
The thread that created the group is not in the list FFirstThread. }
TMTPGroupState = (
mtpgsNone,
mtpgsNeedThreads, // the groups waiting for more threads to help
mtpgsFinishing, // the groups waiting for its threads to finish
mtpgsException // there was an exception => close asap
);
TProcThreadGroup = class
private
FEndIndex: PtrInt;
FException: Exception;
FFirstRunningIndex: PtrInt;
FFirstThread: TProcThread;
FLastRunningIndex: PtrInt;
FMaxThreads: PtrInt;
FNext, FPrev: TProcThreadGroup;
FPool: TProcThreadPool;
FStarterItem: TMultiThreadProcItem;
FStartIndex: PtrInt;
FState: TMTPGroupState;
FTaskData: Pointer;
FTaskFrame: Pointer;
FTaskMethod: TMTMethod;
FTaskProcedure: TMTProcedure;
FThreadCount: PtrInt;
procedure AddToList(var First: TProcThreadGroup; ListType: TMTPGroupState); inline;
procedure RemoveFromList(var First: TProcThreadGroup); inline;
function NeedMoreThreads: boolean; inline;
procedure IncreaseLastRunningIndex(Item: TMultiThreadProcItem);
procedure AddThread(AThread: TProcThread);
procedure RemoveThread(AThread: TProcThread); inline;
procedure Run(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem); inline;
procedure IndexComplete(Index: PtrInt);
procedure WakeThreadsWaitingForIndex;
function HasFinishedIndex(aStartIndex, aEndIndex: PtrInt): boolean;
procedure EnterExceptionState(E: Exception);
public
constructor Create;
destructor Destroy; override;
property Pool: TProcThreadPool read FPool;
property StartIndex: PtrInt read FStartIndex;
property EndIndex: PtrInt read FEndIndex;
property FirstRunningIndex: PtrInt read FFirstRunningIndex; // first started
property LastRunningIndex: PtrInt read FLastRunningIndex; // last started
property TaskData: Pointer read FTaskData;
property TaskMethod: TMTMethod read FTaskMethod;
property TaskProcedure: TMTProcedure read FTaskProcedure;
property TaskFrame: Pointer read FTaskFrame;
property MaxThreads: PtrInt read FMaxThreads;
property StarterItem: TMultiThreadProcItem read FStarterItem;
end;
{ TLightWeightThreadPool
Group 0 are the inactive threads }
{ TProcThreadPool }
TProcThreadPool = class
private
FMaxThreadCount: PtrInt;
FThreadCount: PtrInt;
FFirstInactiveThread: TProcThread;
FFirstActiveThread: TProcThread;
FFirstTerminatedThread: TProcThread;
FFirstGroupNeedThreads: TProcThreadGroup;
FFirstGroupFinishing: TProcThreadGroup;
FCritSection: TRTLCriticalSection;
FDestroying: boolean;
procedure SetMaxThreadCount(const AValue: PtrInt);
procedure CleanTerminatedThreads;
procedure DoParallelIntern(const AMethod: TMTMethod;
const AProc: TMTProcedure; const AFrame: Pointer;
StartIndex, EndIndex: PtrInt;
Data: Pointer = nil; MaxThreads: PtrInt = 0);
public
// for debugging only: the critical section is public:
procedure EnterPoolCriticalSection; inline;
procedure LeavePoolCriticalSection; inline;
public
constructor Create;
destructor Destroy; override;
procedure DoParallel(const AMethod: TMTMethod;
StartIndex, EndIndex: PtrInt;
Data: Pointer = nil; MaxThreads: PtrInt = 0); inline;
procedure DoParallel(const AProc: TMTProcedure;
StartIndex, EndIndex: PtrInt;
Data: Pointer = nil; MaxThreads: PtrInt = 0); inline;
// experimental
procedure DoParallelLocalProc(const LocalProc: Pointer;
StartIndex, EndIndex: PtrInt;
Data: Pointer = nil; MaxThreads: PtrInt = 0); // do not make this inline!
// utility functions for loops:
procedure CalcBlockSize(LoopLength: PtrInt;
out BlockCount, BlockSize: PtrInt; MinBlockSize: PtrInt = 0); inline;
public
property MaxThreadCount: PtrInt read FMaxThreadCount write SetMaxThreadCount;
property ThreadCount: PtrInt read FThreadCount;
end;
var
ProcThreadPool: TProcThreadPool = nil;
threadvar
CurrentThread: TThread; // TProcThread sets this, you can set this for your own TThreads descendants
implementation
{ TMultiThreadProcItem }
destructor TMultiThreadProcItem.Destroy;
begin
if fWaitForPool<>nil then begin
RTLeventdestroy(fWaitForPool);
fWaitForPool:=nil;
end;
inherited Destroy;
end;
function TMultiThreadProcItem.WaitForIndexRange(
StartIndex, EndIndex: PtrInt): boolean;
var
aPool: TProcThreadPool;
begin
//WriteLn('TLightWeightThreadItem.WaitForIndexRange START Index='+IntToStr(Index)+' StartIndex='+IntToStr(StartIndex)+' EndIndex='+IntToStr(EndIndex));
if (EndIndex>=Index) then exit(false);
if EndIndex<StartIndex then exit(true);
if Group=nil then exit(true); // a single threaded group has no group object
// multi threaded group
aPool:=Group.Pool;
if aPool.FDestroying then exit(false); // no more wait allowed
aPool.EnterPoolCriticalSection;
try
if Group.FState=mtpgsException then begin
//WriteLn('TLightWeightThreadItem.WaitForIndexRange Index='+IntToStr(Index)+', Group closing because of error');
exit(false);
end;
if Group.HasFinishedIndex(StartIndex,EndIndex) then begin
//WriteLn('TLightWeightThreadItem.WaitForIndexRange Index='+IntToStr(Index)+', range already finished');
exit(true);
end;
FState:=mtptsWaitingForIndex;
FWaitingForIndexStart:=StartIndex;
FWaitingForIndexEnd:=EndIndex;
if fWaitForPool=nil then
fWaitForPool:=RTLEventCreate;
RTLeventResetEvent(fWaitForPool);
finally
aPool.LeavePoolCriticalSection;
end;
//WriteLn('TLightWeightThreadItem.WaitForIndexRange '+IntToStr(Index)+' waiting ... ');
RTLeventWaitFor(fWaitForPool);
Result:=FState=mtptsActive;
FState:=mtptsActive;
//WriteLn('TLightWeightThreadItem.WaitForIndexRange END '+IntToStr(Index));
end;
function TMultiThreadProcItem.WaitForIndex(Index: PtrInt): boolean; inline;
begin
Result:=WaitForIndexRange(Index,Index);
end;
procedure TMultiThreadProcItem.CalcBlock(Index, BlockSize, LoopLength: PtrInt;
out BlockStart, BlockEnd: PtrInt);
begin
BlockStart:=BlockSize*Index;
BlockEnd:=BlockStart+BlockSize;
if LoopLength<BlockEnd then BlockEnd:=LoopLength;
dec(BlockEnd);
end;
{ TProcThread }
procedure TProcThread.AddToList(var First: TProcThread;
ListType: TMTPThreadList);
begin
FNext[ListType]:=First;
if FNext[ListType]<>nil then
FNext[ListType].FPrev[ListType]:=Self;
First:=Self;
end;
procedure TProcThread.RemoveFromList(var First: TProcThread;
ListType: TMTPThreadList);
begin
if First=Self then
First:=FNext[ListType];
if FNext[ListType]<>nil then
FNext[ListType].FPrev[ListType]:=FPrev[ListType];
if FPrev[ListType]<>nil then
FPrev[ListType].FNext[ListType]:=FNext[ListType];
FNext[ListType]:=nil;
FPrev[ListType]:=nil;
end;
procedure TProcThread.Terminating(aPool: TProcThreadPool;
E: Exception);
begin
aPool.EnterPoolCriticalSection;
try
// remove from group
if Item.FGroup<>nil then begin
// an exception occured
Item.FGroup.EnterExceptionState(E);
Item.FGroup.RemoveThread(Self);
Item.FGroup:=nil;
end;
// move to pool's terminated threads
case Item.FState of
mtptsActive: RemoveFromList(aPool.FFirstActiveThread,mtptlPool);
mtptsInactive: RemoveFromList(aPool.FFirstInactiveThread,mtptlPool);
end;
AddToList(aPool.FFirstTerminatedThread,mtptlPool);
Item.FState:=mtptsTerminated;
finally
aPool.LeavePoolCriticalSection;
end;
end;
constructor TProcThread.Create;
begin
inherited Create(true);
fItem:=TMultiThreadProcItem.Create;
fItem.fWaitForPool:=RTLEventCreate;
fItem.FThread:=Self;
end;
destructor TProcThread.Destroy;
begin
FreeAndNil(FItem);
inherited Destroy;
end;
procedure TProcThread.Execute;
var
aPool: TProcThreadPool;
Group: TProcThreadGroup;
ok: Boolean;
E: Exception;
begin
CurrentThread:=Self;
aPool:=Item.Group.Pool;
ok:=false;
try
repeat
// work
Group:=Item.Group;
Group.Run(Item.Index,Group.TaskData,Item);
aPool.EnterPoolCriticalSection;
try
Group.IndexComplete(Item.Index);
// find next work
if Group.LastRunningIndex<Group.EndIndex then begin
// next index of group
Group.IncreaseLastRunningIndex(Item);
end else begin
// remove from group
RemoveFromList(Group.FFirstThread,mtptlGroup);
dec(Group.FThreadCount);
Item.FGroup:=nil;
Group:=nil;
if aPool.FFirstGroupNeedThreads<>nil then begin
// add to new group
aPool.FFirstGroupNeedThreads.AddThread(Self);
Group:=Item.Group;
end else begin
// mark inactive
RemoveFromList(aPool.FFirstActiveThread,mtptlPool);
AddToList(aPool.FFirstInactiveThread,mtptlPool);
Item.FState:=mtptsInactive;
RTLeventResetEvent(Item.fWaitForPool);
end;
end;
finally
aPool.LeavePoolCriticalSection;
end;
// wait for new work
if Item.FState=mtptsInactive then
RTLeventWaitFor(Item.fWaitForPool);
until Item.Group=nil;
ok:=true;
except
// stop the exception and store it
E:=Exception(AcquireExceptionObject);
Terminating(aPool,E);
end;
if ok then
Terminating(aPool,nil);
end;
{ TProcThreadGroup }
procedure TProcThreadGroup.AddToList(var First: TProcThreadGroup;
ListType: TMTPGroupState);
begin
FNext:=First;
if FNext<>nil then
FNext.FPrev:=Self;
First:=Self;
FState:=ListType;
end;
procedure TProcThreadGroup.RemoveFromList(
var First: TProcThreadGroup);
begin
if First=Self then
First:=FNext;
if FNext<>nil then
FNext.FPrev:=FPrev;
if FPrev<>nil then
FPrev.FNext:=FNext;
FNext:=nil;
FPrev:=nil;
FState:=mtpgsNone;
end;
function TProcThreadGroup.NeedMoreThreads: boolean;
begin
Result:=(FLastRunningIndex<FEndIndex) and (FThreadCount<FMaxThreads)
and (FState<>mtpgsException);
end;
procedure TProcThreadGroup.IncreaseLastRunningIndex(Item: TMultiThreadProcItem);
begin
inc(FLastRunningIndex);
Item.FIndex:=FLastRunningIndex;
if NeedMoreThreads then exit;
if FState=mtpgsNeedThreads then begin
RemoveFromList(Pool.FFirstGroupNeedThreads);
AddToList(Pool.FFirstGroupFinishing,mtpgsFinishing);
end;
end;
procedure TProcThreadGroup.AddThread(AThread: TProcThread);
begin
AThread.Item.FGroup:=Self;
AThread.AddToList(FFirstThread,mtptlGroup);
inc(FThreadCount);
IncreaseLastRunningIndex(AThread.Item);
end;
procedure TProcThreadGroup.RemoveThread(AThread: TProcThread);
begin
AThread.RemoveFromList(FFirstThread,mtptlGroup);
dec(FThreadCount);
end;
procedure TProcThreadGroup.Run(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem); inline;
begin
if Assigned(FTaskFrame) then begin
CallLocalProc(FTaskProcedure,FTaskFrame,Index,Data,Item)
end else begin
if Assigned(FTaskProcedure) then
FTaskProcedure(Index,Data,Item)
else
FTaskMethod(Index,Data,Item)
end;
end;
procedure TProcThreadGroup.IndexComplete(Index: PtrInt);
var
AThread: TProcThread;
NewFirstRunningThread: PtrInt;
begin
// update FirstRunningIndex
NewFirstRunningThread:=FStarterItem.Index;
AThread:=FFirstThread;
while AThread<>nil do begin
if (NewFirstRunningThread>aThread.Item.Index)
and (aThread.Item.Index<>Index) then
NewFirstRunningThread:=aThread.Item.Index;
aThread:=aThread.FNext[mtptlGroup];
end;
FFirstRunningIndex:=NewFirstRunningThread;
// wake up threads (Note: do this even if FFirstRunningIndex has not changed)
WakeThreadsWaitingForIndex;
end;
procedure TProcThreadGroup.WakeThreadsWaitingForIndex;
var
aThread: TProcThread;
begin
if FState<>mtpgsException then begin
// wake up waiting threads
aThread:=FFirstThread;
while aThread<>nil do begin
if (aThread.Item.FState=mtptsWaitingForIndex)
and HasFinishedIndex(aThread.Item.WaitingForIndexStart,
aThread.Item.WaitingForIndexEnd)
then begin
// wake up the thread
aThread.Item.FState:=mtptsActive;
RTLeventSetEvent(aThread.Item.fWaitForPool);
end;
aThread:=aThread.FNext[mtptlGroup];
end;
if (FStarterItem.FState=mtptsWaitingForIndex)
and HasFinishedIndex(FStarterItem.WaitingForIndexStart,FStarterItem.WaitingForIndexEnd)
then begin
// wake up the starter thread of this group
FStarterItem.FState:=mtptsActive;
RTLeventSetEvent(FStarterItem.fWaitForPool);
end;
end else begin
// end group: wake up waiting threads
aThread:=FFirstThread;
while aThread<>nil do begin
if (aThread.Item.FState=mtptsWaitingForIndex)
then begin
// end group: wake up the thread
aThread.Item.FState:=mtptsWaitingFailed;
RTLeventSetEvent(aThread.Item.fWaitForPool);
end;
aThread:=aThread.FNext[mtptlGroup];
end;
if (FStarterItem.FState=mtptsWaitingForIndex)
then begin
// end group: wake up the starter thread of this group
FStarterItem.FState:=mtptsWaitingFailed;
RTLeventSetEvent(FStarterItem.fWaitForPool);
end;
end;
end;
function TProcThreadGroup.HasFinishedIndex(
aStartIndex, aEndIndex: PtrInt): boolean;
var
AThread: TProcThread;
begin
// test the finished range
if FFirstRunningIndex>aEndIndex then exit(true);
// test the unfinished range
if FLastRunningIndex<aEndIndex then exit(false);
// test the active range
AThread:=FFirstThread;
while AThread<>nil do begin
if (AThread.Item.Index>=aStartIndex)
and (AThread.Item.Index<=aEndIndex) then
exit(false);
AThread:=AThread.FNext[mtptlGroup];
end;
if (FStarterItem.Index>=aStartIndex)
and (FStarterItem.Index<=aEndIndex) then
exit(false);
Result:=true;
end;
procedure TProcThreadGroup.EnterExceptionState(E: Exception);
begin
if FState=mtpgsException then exit;
case FState of
mtpgsFinishing: RemoveFromList(Pool.FFirstGroupFinishing);
mtpgsNeedThreads: RemoveFromList(Pool.FFirstGroupNeedThreads);
end;
FState:=mtpgsException;
FException:=E;
WakeThreadsWaitingForIndex;
end;
constructor TProcThreadGroup.Create;
begin
FStarterItem:=TMultiThreadProcItem.Create;
FStarterItem.FGroup:=Self;
end;
destructor TProcThreadGroup.Destroy;
begin
FreeAndNil(FStarterItem);
inherited Destroy;
end;
{ TProcThreadPool }
procedure TProcThreadPool.SetMaxThreadCount(const AValue: PtrInt);
begin
if FMaxThreadCount=AValue then exit;
if AValue<1 then raise Exception.Create('TLightWeightThreadPool.SetMaxThreadCount');
FMaxThreadCount:=AValue;
end;
procedure TProcThreadPool.CleanTerminatedThreads;
var
AThread: TProcThread;
begin
while FFirstTerminatedThread<>nil do begin
AThread:=FFirstTerminatedThread;
AThread.RemoveFromList(FFirstTerminatedThread,mtptlPool);
AThread.Free;
end;
end;
constructor TProcThreadPool.Create;
begin
FMaxThreadCount:=GetSystemThreadCount;
if FMaxThreadCount<1 then
FMaxThreadCount:=1;
InitCriticalSection(FCritSection);
end;
destructor TProcThreadPool.Destroy;
procedure WakeWaitingStarterItems(Group: TProcThreadGroup);
begin
while Group<>nil do begin
if Group.StarterItem.FState=mtptsWaitingForIndex then begin
Group.StarterItem.FState:=mtptsWaitingFailed;
RTLeventSetEvent(Group.StarterItem.fWaitForPool);
end;
Group:=Group.FNext;
end;
end;
var
AThread: TProcThread;
begin
FDestroying:=true;
// wake up all waiting threads
EnterPoolCriticalSection;
try
AThread:=FFirstActiveThread;
while AThread<>nil do begin
if aThread.Item.FState=mtptsWaitingForIndex then begin
aThread.Item.FState:=mtptsWaitingFailed;
RTLeventSetEvent(AThread.Item.fWaitForPool);
end;
AThread:=AThread.FNext[mtptlPool];
end;
WakeWaitingStarterItems(FFirstGroupNeedThreads);
WakeWaitingStarterItems(FFirstGroupFinishing);
finally
LeavePoolCriticalSection;
end;
// wait for all active threads to become inactive
while FFirstActiveThread<>nil do
Sleep(10);
// wake up all inactive threads (without new work they will terminate)
EnterPoolCriticalSection;
try
AThread:=FFirstInactiveThread;
while AThread<>nil do begin
RTLeventSetEvent(AThread.Item.fWaitForPool);
AThread:=AThread.FNext[mtptlPool];
end;
finally
LeavePoolCriticalSection;
end;
// wait for all threads to terminate
while FFirstInactiveThread<>nil do
Sleep(10);
// free threads
CleanTerminatedThreads;
DoneCriticalsection(FCritSection);
inherited Destroy;
end;
procedure TProcThreadPool.EnterPoolCriticalSection;
begin
EnterCriticalsection(FCritSection);
end;
procedure TProcThreadPool.LeavePoolCriticalSection;
begin
LeaveCriticalsection(FCritSection);
end;
procedure TProcThreadPool.DoParallel(const AMethod: TMTMethod;
StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
begin
if not Assigned(AMethod) then exit;
DoParallelIntern(AMethod,nil,nil,StartIndex,EndIndex,Data,MaxThreads);
end;
procedure TProcThreadPool.DoParallel(const AProc: TMTProcedure;
StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
begin
if not Assigned(AProc) then exit;
DoParallelIntern(nil,AProc,nil,StartIndex,EndIndex,Data,MaxThreads);
end;
procedure TProcThreadPool.DoParallelLocalProc(const LocalProc: Pointer;
StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
var
Frame: Pointer;
begin
if not Assigned(LocalProc) then exit;
Frame:=get_caller_frame(get_frame);
DoParallelIntern(nil,TMTProcedure(LocalProc),Frame,StartIndex,EndIndex,
Data,MaxThreads);
end;
procedure TProcThreadPool.CalcBlockSize(LoopLength: PtrInt; out BlockCount,
BlockSize: PtrInt; MinBlockSize: PtrInt);
begin
if LoopLength<=0 then begin
BlockCount:=0;
BlockSize:=1;
exit;
end;
// split work into equally sized blocks
BlockCount:=ProcThreadPool.MaxThreadCount;
BlockSize:=(LoopLength div BlockCount);
if (BlockSize<MinBlockSize) then BlockSize:=MinBlockSize;
BlockCount:=((LoopLength-1) div BlockSize)+1;
end;
procedure TProcThreadPool.DoParallelIntern(const AMethod: TMTMethod;
const AProc: TMTProcedure; const AFrame: Pointer;
StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
var
Group: TProcThreadGroup;
Index: PtrInt;
AThread: TProcThread;
NewThread: Boolean;
Item: TMultiThreadProcItem;
HelperThreadException: Exception;
begin
if (StartIndex>EndIndex) then exit; // nothing to do
if FDestroying then raise Exception.Create('Pool destroyed');
if (MaxThreads>MaxThreadCount) or (MaxThreads<=0) then
MaxThreads:=MaxThreadCount;
if (StartIndex=EndIndex) or (MaxThreads<=1) then begin
// single threaded
Item:=TMultiThreadProcItem.Create;
try
for Index:=StartIndex to EndIndex do begin
Item.FIndex:=Index;
if Assigned(AFrame) then begin
CallLocalProc(AProc,AFrame,Index,Data,Item)
end else begin
if Assigned(AProc) then
AProc(Index,Data,Item)
else
AMethod(Index,Data,Item)
end;
end;
finally
Item.Free;
end;
exit;
end;
// create a new group
Group:=TProcThreadGroup.Create;
Group.FPool:=Self;
Group.FTaskData:=Data;
Group.FTaskMethod:=AMethod;
Group.FTaskProcedure:=AProc;
Group.FTaskFrame:=AFrame;
Group.FStartIndex:=StartIndex;
Group.FEndIndex:=EndIndex;
Group.FFirstRunningIndex:=StartIndex;
Group.FLastRunningIndex:=StartIndex;
Group.FMaxThreads:=MaxThreads;
Group.FThreadCount:=1;
Group.FStarterItem.FState:=mtptsActive;
Group.FStarterItem.FIndex:=StartIndex;
HelperThreadException:=nil;
try
// start threads
EnterPoolCriticalSection;
try
Group.AddToList(FFirstGroupNeedThreads,mtpgsNeedThreads);
while Group.NeedMoreThreads do begin
AThread:=FFirstInactiveThread;
NewThread:=false;
if AThread<>nil then begin
AThread.RemoveFromList(FFirstInactiveThread,mtptlPool);
end else if FThreadCount<FMaxThreadCount then begin
AThread:=TProcThread.Create;
if Assigned(AThread.FatalException) then
raise AThread.FatalException;
NewThread:=true;
inc(FThreadCount);
end else begin
break;
end;
// add to Group
Group.AddThread(AThread);
// start thread
AThread.AddToList(FFirstActiveThread,mtptlPool);
AThread.Item.FState:=mtptsActive;
if NewThread then
{$IF defined(VER2_4_2) or defined(VER2_4_3)}
AThread.Resume
{$ELSE}
AThread.Start
{$ENDIF}
else
RTLeventSetEvent(AThread.Item.fWaitForPool);
end;
finally
LeavePoolCriticalSection;
end;
// run until no more Index left
Index:=StartIndex;
repeat
Group.FStarterItem.FIndex:=Index;
Group.Run(Index,Data,Group.FStarterItem);
EnterPoolCriticalSection;
try
Group.IndexComplete(Index);
if (Group.FLastRunningIndex<Group.EndIndex) and (Group.FState<>mtpgsException)
then begin
inc(Group.FLastRunningIndex);
Index:=Group.FLastRunningIndex;
end else begin
Index:=StartIndex;
end;
finally
LeavePoolCriticalSection;
end;
until Index=StartIndex;
finally
// wait for Group to finish
if Group.FFirstThread<>nil then begin
EnterPoolCriticalSection;
try
Group.FStarterItem.FState:=mtptsInactive;
Group.FStarterItem.fIndex:=EndIndex;// needed for Group.HasFinishedIndex
// wake threads waiting for starter thread to finish
if Group.FStarterItem.FState<>mtptsInactive then
Group.EnterExceptionState(nil)
else
Group.WakeThreadsWaitingForIndex;
finally
LeavePoolCriticalSection;
end;
// waiting with exponential spin lock
Index:=0;
while Group.FFirstThread<>nil do begin
sleep(Index);
Index:=Index*2+1;
if Index>30 then Index:=30;
end;
end;
// remove group from pool
EnterPoolCriticalSection;
try
case Group.FState of
mtpgsNeedThreads: Group.RemoveFromList(FFirstGroupNeedThreads);
mtpgsFinishing: Group.RemoveFromList(FFirstGroupFinishing);
end;
finally
LeavePoolCriticalSection;
end;
HelperThreadException:=Group.FException;
Group.Free;
// free terminated threads (terminated, because of exceptions)
CleanTerminatedThreads;
end;
// if the exception occured in a helper thread raise it now
if HelperThreadException<>nil then
raise HelperThreadException;
end;
initialization
ProcThreadPool:=TProcThreadPool.Create;
CurrentThread:=nil;
finalization
ProcThreadPool.Free;
ProcThreadPool:=nil;
end.

View File

@ -0,0 +1,212 @@
{ Utilities using light weight threads.
This file is part of the Free Pascal run time library.
Copyright (C) 2008 Mattias Gaertner mattias@freepascal.org
See the file COPYING.FPC, included in this distribution,
for details about the copyright.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
**********************************************************************}
{
Abstract:
Utility functions using mtprocs.
For example a parallel sort.
}
unit MTPUtils;
{$mode objfpc}{$H+}
interface
uses
Classes, SysUtils, MTProcs;
type
TSortPartEvent = procedure(aList: PPointer; aCount: PtrInt);
{ TParallelSortPointerList }
TParallelSortPointerList = class
protected
fBlockSize: PtrInt;
fBlockCntPowOf2Offset: PtrInt;
FMergeBuffer: PPointer;
procedure MTPSort(Index: PtrInt; {%H-}Data: Pointer; Item: TMultiThreadProcItem);
public
List: PPointer;
Count: PtrInt;
Compare: TListSortCompare;
BlockCnt: PtrInt;
OnSortPart: TSortPartEvent;
constructor Create(aList: PPointer; aCount: PtrInt; const aCompare: TListSortCompare;
MaxThreadCount: integer = 0);
procedure Sort;
end;
{ Sort a list in parallel using merge sort.
You must provide a compare function.
You can provide your own sort function for the blocks which are sorted in a
single thread, for example a normal quicksort. }
procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare;
MaxThreadCount: integer = 0; const OnSortPart: TSortPartEvent = nil);
implementation
procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare;
MaxThreadCount: integer; const OnSortPart: TSortPartEvent);
var
Sorter: TParallelSortPointerList;
begin
if List.Count<=1 then exit;
Sorter:=TParallelSortPointerList.Create(@List.List[0],List.Count,Compare,
MaxThreadCount);
try
Sorter.OnSortPart:=OnSortPart;
Sorter.Sort;
finally
Sorter.Free;
end;
end;
{ TParallelSortPointerList }
procedure TParallelSortPointerList.MTPSort(Index: PtrInt; Data: Pointer;
Item: TMultiThreadProcItem);
procedure MergeSort(L, M, R: PtrInt; Recursive: boolean);
var
Src1: PtrInt;
Src2: PtrInt;
Dest1: PtrInt;
begin
if R-L<=1 then begin
// sort lists of 1 and 2 items directly
if L<R then begin
if Compare(List[L],List[R])>0 then begin
FMergeBuffer[L]:=List[L];
List[L]:=List[R];
List[R]:=FMergeBuffer[L];
end;
end;
exit;
end;
// sort recursively
if Recursive then begin
MergeSort(L,(L+M) div 2,M-1,true);
MergeSort(M,(M+R+1) div 2,R,true);
end;
// merge both blocks
Src1:=L;
Src2:=M;
Dest1:=L;
repeat
if (Src1<M)
and ((Src2>R) or (Compare(List[Src1],List[Src2])<=0)) then begin
FMergeBuffer[Dest1]:=List[Src1];
inc(Dest1);
inc(Src1);
end else if (Src2<=R) then begin
FMergeBuffer[Dest1]:=List[Src2];
inc(Dest1);
inc(Src2);
end else
break;
until false;
// write the mergebuffer back
Src1:=L;
Dest1:=l;
while Src1<=R do begin
List[Dest1]:=FMergeBuffer[Src1];
inc(Src1);
inc(Dest1);
end;
end;
var
L, M, R: PtrInt;
i: integer;
NormIndex: Integer;
Range: integer;
MergeIndex: Integer;
begin
L:=fBlockSize*Index;
R:=L+fBlockSize-1;
if R>=Count then
R:=Count-1; // last block
//WriteLn('TParallelSortPointerList.LWTSort Index=',Index,' sort block: ',L,' ',(L+R+1) div 2,' ',R);
if Assigned(OnSortPart) then
OnSortPart(@List[L],R-L+1)
else
MergeSort(L,(L+R+1) div 2,R,true);
// merge
// 0 1 2 3 4 5 6 7
// \/ \/ \/ \/
// \/ \/
// \/
// For example: BlockCnt = 5 => Index in 0..4
// fBlockCntPowOf2Offset = 3 (=8-5)
// NormIndex = Index + 3 => NormIndex in 3..7
NormIndex:=Index+fBlockCntPowOf2Offset;
i:=0;
repeat
Range:=1 shl i;
if NormIndex and Range=0 then break;
// merge left and right block(s)
MergeIndex:=NormIndex-Range-fBlockCntPowOf2Offset;
if (MergeIndex+Range-1>=0) then begin
// wait until left blocks have finished
//WriteLn('TParallelSortPointerList.LWTSort Index=',Index,' wait for block ',MergeIndex);
if (MergeIndex>=0) and (not Item.WaitForIndex(MergeIndex)) then exit;
// compute left and right block bounds
M:=L;
L:=(MergeIndex-Range+1)*fBlockSize;
if L<0 then L:=0;
//WriteLn('TParallelSortPointerList.LWTSort Index=',Index,' merge blocks ',L,' ',M,' ',R);
MergeSort(L,M,R,false);
end;
inc(i);
until false;
//WriteLn('TParallelSortPointerList.LWTSort END Index='+IntToStr(Index));
end;
constructor TParallelSortPointerList.Create(aList: PPointer; aCount: PtrInt;
const aCompare: TListSortCompare; MaxThreadCount: integer);
begin
List:=aList;
Count:=aCount;
Compare:=aCompare;
BlockCnt:=Count div 100; // at least 100 items per thread
if BlockCnt>ProcThreadPool.MaxThreadCount then
BlockCnt:=ProcThreadPool.MaxThreadCount;
if (MaxThreadCount>0) and (BlockCnt>MaxThreadCount) then
BlockCnt:=MaxThreadCount;
if BlockCnt<1 then BlockCnt:=1;
end;
procedure TParallelSortPointerList.Sort;
begin
if (Count<=1) then exit;
fBlockSize:=(Count+BlockCnt-1) div BlockCnt;
fBlockCntPowOf2Offset:=1;
while fBlockCntPowOf2Offset<BlockCnt do
fBlockCntPowOf2Offset:=fBlockCntPowOf2Offset*2;
fBlockCntPowOf2Offset:=fBlockCntPowOf2Offset-BlockCnt;
//WriteLn('TParallelSortPointerList.Sort BlockCnt=',BlockCnt,' fBlockSize=',fBlockSize,' fBlockCntPowOf2Offset=',fBlockCntPowOf2Offset);
GetMem(FMergeBuffer,SizeOf(Pointer)*Count);
try
ProcThreadPool.DoParallel(@MTPSort,0,BlockCnt-1);
finally
FreeMem(FMergeBuffer);
FMergeBuffer:=nil;
end;
end;
end.

View File

@ -0,0 +1,51 @@
<?xml version="1.0"?>
<CONFIG>
<Package Version="4">
<Name Value="MultiThreadProcsLaz"/>
<Author Value="Mattias Gaertner mattias@freepascal.org"/>
<CompilerOptions>
<Version Value="10"/>
<SearchPaths>
<UnitOutputDirectory Value="lib/$(TargetCPU)-$(TargetOS)"/>
</SearchPaths>
<Other>
<CompilerMessages>
<UseMsgFile Value="True"/>
</CompilerMessages>
<CompilerPath Value="$(CompPath)"/>
</Other>
</CompilerOptions>
<Description Value="Running procedures and methods parallel via a thread pool."/>
<License Value="modified LGPL2"/>
<Version Major="1" Minor="2" Release="1"/>
<Files Count="3">
<Item1>
<Filename Value="mtprocs.pas"/>
<UnitName Value="MTProcs"/>
</Item1>
<Item2>
<Filename Value="mtputils.pas"/>
<UnitName Value="MTPUtils"/>
</Item2>
<Item3>
<Filename Value="mtpcpu.pas"/>
<UnitName Value="MTPCPU"/>
</Item3>
</Files>
<Type Value="RunAndDesignTime"/>
<RequiredPkgs Count="1">
<Item1>
<PackageName Value="FCL"/>
<MinVersion Major="1" Valid="True"/>
</Item1>
</RequiredPkgs>
<UsageOptions>
<CustomOptions Value="-dUseCThreads"/>
<UnitPath Value="$(PkgOutDir)"/>
</UsageOptions>
<PublishOptions>
<Version Value="2"/>
<IgnoreBinaries Value="False"/>
</PublishOptions>
</Package>
</CONFIG>

View File

@ -0,0 +1,20 @@
{ This file was automatically created by Lazarus. do not edit!
This source is only used to compile and install the package.
}
unit MultiThreadProcsLaz;
interface
uses
MTProcs, MTPUtils, MTPCPU, LazarusPackageIntf;
implementation
procedure Register;
begin
end;
initialization
RegisterPackage('MultiThreadProcsLaz', @Register);
end.

View File

@ -0,0 +1 @@
$(LazarusDir)/components/multithreadprocs/multithreadprocslaz.lpk