* fixed semaphore implementation based on file handles (select needs

file descriptor + 1 as first parameter, select can also be EIntr)
  * changed IntbasiceventWaitFor (used by syncobjs) so it can emulate
    timeouts (using a loop and short sleeps) (mantis #9414)
  * also added wrAbandoned support to IntbasiceventWaitFor
  * enhanced tbrtlevt.pp to test new functionality

git-svn-id: trunk@8257 -
This commit is contained in:
Jonas Maebe 2007-08-10 20:20:44 +00:00
parent 908a116cfc
commit 494fb81dd8
2 changed files with 275 additions and 46 deletions

View File

@ -80,6 +80,8 @@ Type PINTRTLEvent = ^TINTRTLEvent;
isset: boolean;
end;
TTryWaitResult = (tw_error, tw_semwasunlocked, tw_semwaslocked);
{*****************************************************************************
Threadvar support
*****************************************************************************}
@ -325,14 +327,12 @@ Type PINTRTLEvent = ^TINTRTLEvent;
CWaitForThreadTerminate := dword(LResultP);
end;
{$warning threadhandle can be larger than a dword}
function CThreadSetPriority (threadHandle : TThreadID; Prio: longint): boolean; {-15..+15, 0=normal}
begin
{$Warning ThreadSetPriority needs to be implemented}
end;
{$warning threadhandle can be larger than a dword}
function CThreadGetPriority (threadHandle : TThreadID): Integer;
begin
{$Warning ThreadGetPriority needs to be implemented}
@ -400,7 +400,6 @@ Type PINTRTLEvent = ^TINTRTLEvent;
Semaphore routines
*****************************************************************************}
procedure cSemaphoreWait(const FSem: Pointer);
var
res: cint;
@ -422,6 +421,7 @@ begin
{$endif}
end;
procedure cSemaphorePost(const FSem: Pointer);
{$if defined(has_sem_init) or defined(has_sem_open)}
begin
@ -442,6 +442,50 @@ end;
{$endif}
function cSemaphoreTryWait(const FSem: pointer): TTryWaitResult;
var
res: cint;
err: cint;
{$if defined(has_sem_init) or defined(has_sem_open)}
begin
repeat
res:=sem_trywait(FSem);
err:=fpgeterrno;
until (res<>-1) or (err<>ESysEINTR);
if (res=0) then
result:=tw_semwasunlocked
else if (err=ESysEAgain) then
result:=tw_semwaslocked
else
result:=tw_error;
{$else has_sem_init or has_sem_open}
var
fds: TFDSet;
tv : timeval;
begin
tv.tv_sec:=0;
tv.tv_usec:=0;
fpFD_ZERO(fds);
fpFD_SET(PFilDes(FSem)^[0],fds);
repeat
res:=fpselect(PFilDes(FSem)^[0]+1,@fds,nil,nil,@tv);
err:=fpgeterrno;
until (res>=0) or ((res=-1) and (err<>ESysEIntr));
if (res>0) then
begin
cSemaphoreWait(FSem);
result:=tw_semwasunlocked
end
else if (res=0) then
result:=tw_semwaslocked
else
result:=tw_error;
{$endif has_sem_init or has_sem_open}
end;
{$if defined(has_sem_open) and not defined(has_sem_init)}
function cIntSemaphoreOpen(const name: pchar; initvalue: boolean): Pointer;
var
@ -525,7 +569,9 @@ type
Tbasiceventstate=record
FSem: Pointer;
FEventSection: TPthreadMutex;
FManualReset: Boolean;
FWaiters: longint;
FManualReset,
FDestroying: Boolean;
end;
plocaleventstate = ^tbasiceventstate;
// peventstate=pointer;
@ -544,6 +590,8 @@ var
begin
new(plocaleventstate(result));
plocaleventstate(result)^.FManualReset:=AManualReset;
plocaleventstate(result)^.FWaiters:=0;
plocaleventstate(result)^.FDestroying:=False;
{$ifdef has_sem_init}
plocaleventstate(result)^.FSem:=cIntSemaphoreInit(true);
if plocaleventstate(result)^.FSem=nil then
@ -592,49 +640,49 @@ begin
end;
procedure Intbasiceventdestroy(state:peventstate);
var
i: longint;
begin
{ safely mark that we are destroying this event }
pthread_mutex_lock(@plocaleventstate(state)^.feventsection);
plocaleventstate(state)^.FDestroying:=true;
{ wake up everyone who is waiting }
for i := 1 to plocaleventstate(state)^.FWaiters do
cSemaphorePost(plocaleventstate(state)^.FSem);
pthread_mutex_unlock(@plocaleventstate(state)^.feventsection);
{ now wait until they've finished their business }
while (plocaleventstate(state)^.FWaiters <> 0) do
cThreadSwitch;
{ and clean up }
cSemaphoreDestroy(plocaleventstate(state)^.FSem);
FreeMem(state);
dispose(plocaleventstate(state));
end;
procedure IntbasiceventResetEvent(state:peventstate);
{$if defined(has_sem_init) or defined(has_sem_open)}
var
res: cint;
err: cint;
begin
repeat
res:=sem_trywait(psem_t(plocaleventstate(state)^.FSem));
err:=fpgeterrno;
until (res<>0) and ((res<>-1) or (err<>ESysEINTR));
{$else has_sem_init or has_sem_open}
var
fds: TFDSet;
tv : timeval;
begin
tv.tv_sec:=0;
tv.tv_usec:=0;
fpFD_ZERO(fds);
fpFD_SET(PFilDes(plocaleventstate(state)^.FSem)^[0],fds);
{$if not defined(has_sem_init) and not defined(has_sem_open)}
pthread_mutex_lock(@plocaleventstate(state)^.feventsection);
Try
while fpselect(PFilDes(plocaleventstate(state)^.FSem)^[0],@fds,nil,nil,@tv) > 0 do
cSemaphoreWait(plocaleventstate(state)^.FSem);
try
{$endif}
while (cSemaphoreTryWait(plocaleventstate(state)^.FSem) = tw_semwasunlocked) do
;
{$if not defined(has_sem_init) and not defined(has_sem_open)}
finally
pthread_mutex_unlock(@plocaleventstate(state)^.feventsection);
end;
{$endif has_sem_init or has_sem_open}
{$endif}
end;
procedure IntbasiceventSetEvent(state:peventstate);
Var
{$if defined(has_sem_init) or defined(has_sem_open)}
Value : Longint;
res : cint;
err : cint;
{$if defined(has_sem_init) or defined(has_sem_open)}
Value : Longint;
{$else}
fds: TFDSet;
tv : timeval;
@ -668,7 +716,11 @@ begin
tv.tv_usec:=0;
fpFD_ZERO(fds);
fpFD_SET(PFilDes(plocaleventstate(state)^.FSem)^[0],fds);
if fpselect(PFilDes(plocaleventstate(state)^.FSem)^[0],@fds,nil,nil,@tv)=0 then
repeat
res:=fpselect(PFilDes(plocaleventstate(state)^.FSem)^[0]+1,@fds,nil,nil,@tv);
err:=fpgeterrno;
until (res>=0) or ((res=-1) and (err<>ESysEIntr));
if (res=0) then
cSemaphorePost(plocaleventstate(state)^.FSem);
{$endif has_sem_init or has_sem_open}
finally
@ -676,15 +728,104 @@ begin
end;
end;
function IntbasiceventWaitFor(Timeout : Cardinal;state:peventstate) : longint;
function IntbasiceventWaitFor(Timeout : Cardinal;state:peventstate) : longint;
var
i, loopcnt: cardinal;
timespec, timetemp, timeleft: ttimespec;
nanores, nanoerr: cint;
twres: TTryWaitResult;
begin
If TimeOut<>Cardinal($FFFFFFFF) then
result:=wrError
{ safely check whether we are being destroyed, if so immediately return. }
{ otherwise (under the same mutex) increase the number of waiters }
pthread_mutex_lock(@plocaleventstate(state)^.feventsection);
if (plocaleventstate(state)^.FDestroying) then
begin
pthread_mutex_unlock(@plocaleventstate(state)^.feventsection);
result := wrAbandoned;
exit;
end;
inc(plocaleventstate(state)^.FWaiters);
pthread_mutex_unlock(@plocaleventstate(state)^.feventsection);
if TimeOut=Cardinal($FFFFFFFF) then
begin
{ if no timeout, just wait until we are woken up }
cSemaphoreWait(plocaleventstate(state)^.FSem);
if not(plocaleventstate(state)^.FDestroying) then
result:=wrSignaled
else
result:=wrAbandoned;
end
else
begin
cSemaphoreWait(plocaleventstate(state)^.FSem);
result:=wrSignaled;
timespec.tv_sec:=0;
{ 500 miliseconds or less -> wait once for this duration }
if (timeout <= 500) then
loopcnt:=1
{ otherwise wake up every 500 msecs to check }
{ (we'll wait a little longer in total because }
{ we don't take into account the overhead) }
else
begin
loopcnt := timeout div 500;
timespec.tv_nsec:=500*1000000;
end;
result := wrTimeOut;
nanores := 0;
for i := 1 to loopcnt do
begin
{ in the last iteration, wait for the amount of time left }
if (i = loopcnt) then
timespec.tv_nsec:=(timeout mod 500) * 1000000;
timetemp:=timespec;
{ every time our sleep is interrupted for whatever reason, }
{ also check whether the semaphore has been posted in the }
{ mean time }
repeat
{$if not defined(has_sem_init) and not defined(has_sem_open)}
pthread_mutex_lock(@plocaleventstate(state)^.feventsection);
try
{$endif}
twres := cSemaphoreTryWait(plocaleventstate(state)^.FSem);
{$if not defined(has_sem_init) and not defined(has_sem_open)}
finally
pthread_mutex_unlock(@plocaleventstate(state)^.feventsection);
end;
{$endif}
case twres of
tw_error:
begin
result := wrError;
break;
end;
tw_semwasunlocked:
begin
result := wrSignaled;
break;
end;
end;
nanores:=fpnanosleep(@timetemp,@timeleft);
nanoerr:=fpgeterrno;
timetemp:=timeleft;
{ loop until 1) we slept complete interval; 2) an error occurred; }
{ 3) we're being destroyed }
until (nanores=0) or ((nanores<>0) and (nanoerr<>ESysEINTR)) or plocaleventstate(state)^.FDestroying;
{ adjust result for error or being destroyed }
if (nanores <> 0) then
result := wrError
else if plocaleventstate(state)^.FDestroying then
result := wrAbandoned;
{ break out of greater loop when we got the lock, when an error }
{ occurred, or when we are being destroyed }
if (result<>wrTimeOut) then
break;
end;
end;
if (result=wrSignaled) then
begin
if plocaleventstate(state)^.FManualReset then
begin
pthread_mutex_lock(@plocaleventstate(state)^.feventsection);
@ -693,9 +834,14 @@ begin
cSemaphorePost(plocaleventstate(state)^.FSem);
Finally
pthread_mutex_unlock(@plocaleventstate(state)^.feventsection);
end;
end;
end;
end;
{ don't put this above the previous if-block, because otherwise }
{ we can get errors in case an object is destroyed between the }
{ end of the wait/sleep loop and the signalling above. }
{ The pthread_mutex_unlock above takes care of the memory barrier }
interlockeddecrement(plocaleventstate(state)^.FWaiters);
end;
function intRTLEventCreate: PRTLEvent;

View File

@ -7,15 +7,71 @@ uses
sysutils,
classes;
Const
wrSignaled = 0;
wrTimeout = 1;
wrAbandoned= 2;
wrError = 3;
type
tc = class(tthread)
procedure execute; override;
end;
torder = (o_destroy, o_post, o_sleeppost, o_waittimeoutabandon, o_waittimeoutsignal);
thelper = class(tthread)
private
forder: torder;
public
constructor create(order: torder);
procedure execute; override;
end;
var
event: pEventState;
waiting: boolean;
constructor thelper.create(order: torder);
begin
forder:=order;
inherited create(false);
end;
procedure thelper.execute;
var
res: longint;
begin
case forder of
o_destroy:
basiceventdestroy(event);
o_post:
basiceventsetevent(event);
o_sleeppost:
begin
sleep(1000);
basiceventsetevent(event);
end;
o_waittimeoutabandon:
begin
res:=basiceventWaitFor(1000,event);
if (res<>wrAbandoned) then
begin
writeln('error 1');
halt(1);
end;
end;
o_waittimeoutsignal:
begin
res:=basiceventWaitFor(1000,event);
if (res<>wrSignaled) then
begin
writeln('error 2');
halt(2);
end;
end;
end;
end;
procedure tc.execute;
begin
{ make sure we don't exit before this thread has initialised, since }
@ -23,32 +79,59 @@ begin
{ problems for heaptrc as it goes over the memory map in its exit code }
waiting:=true;
{ avoid deadlocks/bugs from causing this test to never quit }
sleep(1000*20);
halt(1);
sleep(1000*10);
writeln('error 3');
halt(3);
end;
var
help: thelper;
begin
waiting:=false;
tc.create(false);
event := BasicEventCreate(nil,false,false,'bla');;
event := BasicEventCreate(nil,false,false,'bla');
basiceventSetEvent(event);
if (basiceventWaitFor(cardinal(-1),event) <> 0) then
if (basiceventWaitFor(cardinal(-1),event) <> wrSignaled) then
begin
writeln('error');
halt(1);
writeln('error 4');
halt(4);
end;
basiceventSetEvent(event);
if (basiceventWaitFor(1000,event) <> wrSignaled) then
begin
writeln('error 5');
halt(5);
end;
{ shouldn't change anything }
basiceventResetEvent(event);
basiceventSetEvent(event);
{ shouldn't change anything }
basiceventSetEvent(event);
if (basiceventWaitFor(cardinal(-1),event) <> 0) then
if (basiceventWaitFor(cardinal(-1),event) <> wrSignaled) then
begin
writeln('error');
halt(1);
writeln('error 6');
halt(6);
end;
{ make sure the two BasicSetEvents aren't cumulative }
if (basiceventWaitFor(1000,event) <> wrTimeOut) then
begin
writeln('error 7');
halt(7);
end;
help:=thelper.create(o_waittimeoutabandon);
basiceventdestroy(event);
help.waitfor;
help.free;
event := BasicEventCreate(nil,false,false,'bla');
help:=thelper.create(o_waittimeoutsignal);
basiceventSetEvent(event);
help.waitfor;
help.free;
basiceventdestroy(event);
while not waiting do
sleep(20);
end.