* new Intbasicevent*() implementation based on pthread_cond_signal by

Fabio Luis Girardi (mantis #9895)

git-svn-id: trunk@12857 -
This commit is contained in:
Jonas Maebe 2009-03-04 20:15:27 +00:00
parent be5709fcfd
commit b92df02778

View File

@ -53,6 +53,8 @@ interface
{$endif darwin} {$endif darwin}
{$endif} {$endif}
{$define basicevents_with_pthread_cond}
Procedure SetCThreadManager; Procedure SetCThreadManager;
implementation implementation
@ -561,7 +563,6 @@ function cIntSemaphoreInit(initvalue: boolean): Pointer;
var var
tid: string[31]; tid: string[31];
semname: string[63]; semname: string[63];
err: cint;
{$endif} {$endif}
begin begin
{$ifdef has_sem_init} {$ifdef has_sem_init}
@ -616,13 +617,15 @@ end;
type type
TPthreadCondition = pthread_cond_t;
TPthreadMutex = pthread_mutex_t; TPthreadMutex = pthread_mutex_t;
Tbasiceventstate=record Tbasiceventstate=record
FSem: Pointer; FCondVar: TPthreadCondition;
FEventSection: TPthreadMutex; FEventSection: TPthreadMutex;
FWaiters: longint; FWaiters: longint;
FIsSet,
FManualReset, FManualReset,
FDestroying: Boolean; FDestroying : Boolean;
end; end;
plocaleventstate = ^tbasiceventstate; plocaleventstate = ^tbasiceventstate;
// peventstate=pointer; // peventstate=pointer;
@ -634,7 +637,6 @@ Const
wrError = 3; wrError = 3;
function IntBasicEventCreate(EventAttributes : Pointer; AManualReset,InitialState : Boolean;const Name : ansistring):pEventState; function IntBasicEventCreate(EventAttributes : Pointer; AManualReset,InitialState : Boolean;const Name : ansistring):pEventState;
var var
MAttr : pthread_mutexattr_t; MAttr : pthread_mutexattr_t;
res : cint; res : cint;
@ -643,33 +645,14 @@ begin
plocaleventstate(result)^.FManualReset:=AManualReset; plocaleventstate(result)^.FManualReset:=AManualReset;
plocaleventstate(result)^.FWaiters:=0; plocaleventstate(result)^.FWaiters:=0;
plocaleventstate(result)^.FDestroying:=False; plocaleventstate(result)^.FDestroying:=False;
{$ifdef has_sem_init} plocaleventstate(result)^.FIsSet:=InitialState;
plocaleventstate(result)^.FSem:=cIntSemaphoreInit(initialstate); res := pthread_cond_init(@plocaleventstate(result)^.FCondVar, nil);
if plocaleventstate(result)^.FSem=nil then if (res <> 0) then
begin begin
FreeMem(result); FreeMem(result);
fpc_threaderror; fpc_threaderror;
end; end;
{$else}
{$ifdef has_sem_open}
plocaleventstate(result)^.FSem:=cIntSemaphoreOpen(PChar(Name),InitialState);
if (plocaleventstate(result)^.FSem = NIL) then
begin
FreeMem(result);
fpc_threaderror;
end;
{$else}
plocaleventstate(result)^.FSem:=cSemaphoreInit;
if (plocaleventstate(result)^.FSem = NIL) then
begin
FreeMem(result);
fpc_threaderror;
end;
if InitialState then
cSemaphorePost(plocaleventstate(result)^.FSem);
{$endif}
{$endif}
// plocaleventstate(result)^.feventsection:=nil;
res:=pthread_mutexattr_init(@MAttr); res:=pthread_mutexattr_init(@MAttr);
if res=0 then if res=0 then
begin begin
@ -681,112 +664,64 @@ begin
end end
else else
res:=pthread_mutex_init(@plocaleventstate(result)^.feventsection,nil); res:=pthread_mutex_init(@plocaleventstate(result)^.feventsection,nil);
pthread_mutexattr_destroy(@MAttr); pthread_mutexattr_destroy(@MAttr);
if res <> 0 then if res <> 0 then
begin begin
cSemaphoreDestroy(plocaleventstate(result)^.FSem); pthread_cond_destroy(@plocaleventstate(result)^.FCondVar);
FreeMem(result); FreeMem(result);
fpc_threaderror; fpc_threaderror;
end; end;
end; end;
procedure Intbasiceventdestroy(state:peventstate); procedure Intbasiceventdestroy(state:peventstate);
var
i: longint;
begin begin
{ safely mark that we are destroying this event } { safely mark that we are destroying this event }
pthread_mutex_lock(@plocaleventstate(state)^.feventsection); pthread_mutex_lock(@plocaleventstate(state)^.feventsection);
plocaleventstate(state)^.FDestroying:=true; plocaleventstate(state)^.FDestroying:=true;
{ wake up everyone who is waiting }
for i := 1 to plocaleventstate(state)^.FWaiters do { send a signal to all threads that are waiting }
cSemaphorePost(plocaleventstate(state)^.FSem); pthread_cond_broadcast(@plocaleventstate(state)^.FCondVar);
pthread_mutex_unlock(@plocaleventstate(state)^.feventsection); pthread_mutex_unlock(@plocaleventstate(state)^.feventsection);
{ now wait until they've finished their business } { now wait until they've finished their business }
while (plocaleventstate(state)^.FWaiters <> 0) do while (plocaleventstate(state)^.FWaiters <> 0) do
cThreadSwitch; cThreadSwitch;
{ and clean up } { and clean up }
cSemaphoreDestroy(plocaleventstate(state)^.FSem); pthread_cond_destroy(@plocaleventstate(state)^.Fcondvar);
pthread_mutex_destroy(@plocaleventstate(state)^.FEventSection);
dispose(plocaleventstate(state)); dispose(plocaleventstate(state));
end; end;
procedure IntbasiceventResetEvent(state:peventstate); procedure IntbasiceventResetEvent(state:peventstate);
begin begin
{$if not defined(has_sem_init) and not defined(has_sem_open)}
pthread_mutex_lock(@plocaleventstate(state)^.feventsection); pthread_mutex_lock(@plocaleventstate(state)^.feventsection);
try plocaleventstate(state)^.fisset:=false;
{$endif} pthread_mutex_unlock(@plocaleventstate(state)^.feventsection);
while (cSemaphoreTryWait(plocaleventstate(state)^.FSem) = tw_semwasunlocked) do
;
{$if not defined(has_sem_init) and not defined(has_sem_open)}
finally
pthread_mutex_unlock(@plocaleventstate(state)^.feventsection);
end;
{$endif}
end; end;
procedure IntbasiceventSetEvent(state:peventstate); procedure IntbasiceventSetEvent(state:peventstate);
Var
res : cint;
err : cint;
{$if defined(has_sem_init) or defined(has_sem_open)}
Value : Longint;
{$else}
fds: TFDSet;
tv : timeval;
{$endif}
begin begin
pthread_mutex_lock(@plocaleventstate(state)^.feventsection); pthread_mutex_lock(@plocaleventstate(state)^.feventsection);
Try Try
{$if defined(has_sem_init) or defined(has_sem_open)} plocaleventstate(state)^.Fisset:=true;
if (sem_getvalue(plocaleventstate(state)^.FSem,@value) <> -1) then if not(plocaleventstate(state)^.FManualReset) then
begin pthread_cond_signal(@plocaleventstate(state)^.Fcondvar)
if Value=0 then
cSemaphorePost(plocaleventstate(state)^.FSem);
end
else if (fpgetCerrno = ESysENOSYS) then
{ not yet implemented on Mac OS X 10.4.8 }
begin
repeat
res:=sem_trywait(psem_t(plocaleventstate(state)^.FSem));
err:=fpgetCerrno;
until ((res<>-1) or (err<>ESysEINTR));
{ now we've either decreased the semaphore by 1 (if it was }
{ not zero), or we've done nothing (if it was already zero) }
{ -> increase by 1 and we have the same result as }
{ increasing by 1 only if it was 0 }
cSemaphorePost(plocaleventstate(state)^.FSem);
end
else else
fpc_threaderror; pthread_cond_broadcast(@plocaleventstate(state)^.Fcondvar);
{$else has_sem_init or has_sem_open}
tv.tv_sec:=0;
tv.tv_usec:=0;
fpFD_ZERO(fds);
fpFD_SET(PFilDes(plocaleventstate(state)^.FSem)^[0],fds);
repeat
res:=fpselect(PFilDes(plocaleventstate(state)^.FSem)^[0]+1,@fds,nil,nil,@tv);
err:=fpgeterrno;
until (res>=0) or ((res=-1) and (err<>ESysEIntr));
if (res=0) then
cSemaphorePost(plocaleventstate(state)^.FSem);
{$endif has_sem_init or has_sem_open}
finally finally
pthread_mutex_unlock(@plocaleventstate(state)^.feventsection); pthread_mutex_unlock(@plocaleventstate(state)^.feventsection);
end; end;
end; end;
function IntbasiceventWaitFor(Timeout : Cardinal;state:peventstate) : longint; function IntbasiceventWaitFor(Timeout : Cardinal;state:peventstate) : longint;
var var
i, loopcnt: cardinal; timespec: ttimespec;
timespec, timetemp, timeleft: ttimespec; errres: cint;
nanores, nanoerr: cint; isset: boolean;
twres: TTryWaitResult; tnow : timeval;
lastloop: boolean;
begin begin
{ safely check whether we are being destroyed, if so immediately return. } { safely check whether we are being destroyed, if so immediately return. }
{ otherwise (under the same mutex) increase the number of waiters } { otherwise (under the same mutex) increase the number of waiters }
@ -797,127 +732,58 @@ begin
result := wrAbandoned; result := wrAbandoned;
exit; exit;
end; end;
inc(plocaleventstate(state)^.FWaiters); { not a regular inc() because it may happen simulatneously with the }
pthread_mutex_unlock(@plocaleventstate(state)^.feventsection); { interlockeddecrement() at the end }
interlockedincrement(plocaleventstate(state)^.FWaiters);
if TimeOut=Cardinal($FFFFFFFF) then //Wait without timeout using pthread_cond_wait
if Timeout = $FFFFFFFF then
begin begin
{ if no timeout, just wait until we are woken up } while (not plocaleventstate(state)^.FIsSet) and (not plocaleventstate(state)^.FDestroying) do
cSemaphoreWait(plocaleventstate(state)^.FSem); pthread_cond_wait(@plocaleventstate(state)^.Fcondvar, @plocaleventstate(state)^.feventsection);
if not(plocaleventstate(state)^.FDestroying) then
result:=wrSignaled
else
result:=wrAbandoned;
end end
else else
begin begin
{$ifdef has_sem_timedwait} //Wait with timeout using pthread_cont_timedwait
fpgettimeofday(@timespec,nil); fpgettimeofday(@tnow,nil);
inc(timespec.tv_nsec, (timeout mod 1000) * 1000000); timespec.tv_sec := tnow.tv_sec + (clong(timeout) div 1000);
inc(timespec.tv_sec, timeout div 1000); timespec.tv_nsec := (clong(timeout) mod 1000)*1000000 + tnow.tv_usec*1000;
if timespec.tv_nsec > 1000000000 then if timespec.tv_nsec >= 1000000000 then
begin
dec(timespec.tv_nsec, 1000000000);
inc(timespec.tv_sec);
end;
nanores := cSemaphoreTimedWait(plocaleventstate(state)^.FSem, timespec);
if nanores = 0 then
result := wrSignaled
else if nanores = ESysETIMEDOUT then
result := wrTimeout
else
result := wrError;
{$else}
timespec.tv_sec:=0;
{ 20 miliseconds or less -> wait once for this duration }
if (timeout <= 20) then
loopcnt:=1
{ otherwise wake up every 20 msecs to check }
{ (we'll wait a little longer in total because }
{ we don't take into account the overhead) }
else
begin begin
loopcnt := timeout div 20; inc(timespec.tv_sec);
timespec.tv_nsec:=20*1000000; dec(timespec.tv_nsec, 1000000000);
end; end;
result := wrTimeOut; errres:=0;
nanores := 0; while (not plocaleventstate(state)^.FDestroying) and (not plocaleventstate(state)^.FIsSet) and (errres<>ESysETIMEDOUT) do
errres:=pthread_cond_timedwait(@plocaleventstate(state)^.Fcondvar, @plocaleventstate(state)^.feventsection, @timespec);
end;
for i := 1 to loopcnt do isset := plocaleventstate(state)^.FIsSet;
begin
{ in the last iteration, wait for the amount of time left } { if ManualReset=false, reset the event immediately. }
if (i = loopcnt) then if (plocaleventstate(state)^.FManualReset=false) then
timespec.tv_nsec:=(timeout mod 20) * 1000000; plocaleventstate(state)^.FIsSet := false;
timetemp:=timespec;
lastloop:=false; //check the results...
{ every time our sleep is interrupted for whatever reason, } if plocaleventstate(state)^.FDestroying then
{ also check whether the semaphore has been posted in the } Result := wrAbandoned
{ mean time } else
repeat if IsSet then
{$if not defined(has_sem_init) and not defined(has_sem_open)} Result := wrSignaled
pthread_mutex_lock(@plocaleventstate(state)^.feventsection); else
try begin
{$endif} if errres=ESysETIMEDOUT then
twres := cSemaphoreTryWait(plocaleventstate(state)^.FSem); Result := wrTimeout
{$if not defined(has_sem_init) and not defined(has_sem_open)} else
finally Result := wrError;
pthread_mutex_unlock(@plocaleventstate(state)^.feventsection); end;
end;
{$endif} pthread_mutex_unlock(@plocaleventstate(state)^.feventsection);
case twres of
tw_error: { don't put this above the previous pthread_mutex_unlock, because }
begin { otherwise we can get errors in case an object is destroyed between }
result := wrError; { end of the wait/sleep loop and the signalling above. }
break; { The pthread_mutex_unlock above takes care of the memory barrier }
end;
tw_semwasunlocked:
begin
result := wrSignaled;
break;
end;
end;
if (lastloop) then
break;
nanores:=fpnanosleep(@timetemp,@timeleft);
nanoerr:=fpgeterrno;
timetemp:=timeleft;
lastloop:=(i=loopcnt);
{ loop until 1) we slept complete interval (except if last for-loop }
{ in which case we try to lock once more); 2) an error occurred; }
{ 3) we're being destroyed }
until ((nanores=0) and not lastloop) or ((nanores<>0) and (nanoerr<>ESysEINTR)) or plocaleventstate(state)^.FDestroying;
{ adjust result being destroyed or error (in this order, since }
{ if we're being destroyed the "error" could be ESysEINTR, which }
{ is not a real error }
if plocaleventstate(state)^.FDestroying then
result := wrAbandoned
else if (nanores <> 0) then
result := wrError;
{ break out of greater loop when we got the lock, when an error }
{ occurred, or when we are being destroyed }
if (result<>wrTimeOut) then
break;
end;
{$endif}
end;
if (result=wrSignaled) then
begin
if plocaleventstate(state)^.FManualReset then
begin
pthread_mutex_lock(@plocaleventstate(state)^.feventsection);
Try
intbasiceventresetevent(State);
cSemaphorePost(plocaleventstate(state)^.FSem);
Finally
pthread_mutex_unlock(@plocaleventstate(state)^.feventsection);
end;
end;
end;
{ don't put this above the previous if-block, because otherwise }
{ we can get errors in case an object is destroyed between the }
{ end of the wait/sleep loop and the signalling above. }
{ The pthread_mutex_unlock above takes care of the memory barrier }
interlockeddecrement(plocaleventstate(state)^.FWaiters); interlockeddecrement(plocaleventstate(state)^.FWaiters);
end; end;