* several fixes for TMultiReadExclusiveWriteSynchronizer, based on patch by

Derek (mantis #28830)
   o ability for a thread to acquire a readlock when it already holds a
     write lock, or vice versa
   o detect when thread1 releases a lock while it hadn't acquired one
   o correct result of beginwrite (return true only when another thread
     held/acquired a write lock before the current thread got it)
   o extra compared to Derek's patch: replaced some additional
     (mostly pre-existing) interlocked*() hacks with proper memory
     barriers, made formatting consistent, fixed compilation on platforms
     where tthreadid is not an integer type, improved tthreadid hashing
     function for 64 bit platforms, fixed some comments

git-svn-id: trunk@34678 -
This commit is contained in:
Jonas Maebe 2016-10-08 20:45:45 +00:00
parent 79faa8f01d
commit 265923a2ee
3 changed files with 680 additions and 95 deletions

View File

@ -40,17 +40,28 @@ type
TMultiReadExclusiveWriteSynchronizer = class(TInterfacedObject,IReadWriteSync)
{$ifdef FPC_HAS_FEATURE_THREADING}
private
freaderqueue: peventstate;
fwritelock : TRtlCriticalSection;
fwaitingwriterlock: prtlevent;
freadercount: cardinal;
fwritelocked: longint;
fThreadList: array [0..15] of Pointer;
freaderqueue: peventstate;
fwritelock : TRtlCriticalSection;
fwaitingwriterlock: prtlevent;
fWriterThreadID: TThreadID;
fRevisionLevel: cardinal;
fwriterequests: cardinal;
factivethreads: cardinal;
protected
function ThreadIDtoIndex( aThreadID: TThreadID ): integer; inline;
function GetThreadInfo(AutoCreate: Boolean): Pointer;
procedure RemoveThread(AThreadInfo: Pointer);
public
constructor Create; virtual;
destructor Destroy; override;
function Beginwrite : boolean;
procedure Endwrite;
procedure Beginread;
procedure Endread;
constructor Create; virtual;
destructor Destroy; override;
function Beginwrite : boolean;
procedure Endwrite;
procedure Beginread;
procedure Endread;
property RevisionLevel: cardinal read fRevisionLevel;
property WriterThreadID: TThreadID read fWriterThreadID;
{$endif FPC_HAS_FEATURE_THREADING}
end;
TMREWException = class(Exception);

View File

@ -44,17 +44,33 @@ begin
end;
type
PMREWThreadInfo = ^TMREWThreadInfo;
TMREWThreadInfo = record
Next: PMREWThreadInfo;
Active: longint;
RefCount: LongInt;
ThreadID: TThreadID;
end;
const
cInUse: LongInt = MaxInt;
cAvail: LongInt = 0;
const
cNewReader : LongInt = - $1;
cNewWriter : LongInt = $10000;
cReadMask : LongInt = $0000FFFF;
cWriteMask : LongInt = $7FFF0000;
constructor TMultiReadExclusiveWriteSynchronizer.Create;
begin
System.InitCriticalSection(fwritelock);
fwaitingwriterlock:=RTLEventCreate;
RTLEventResetEvent(fwaitingwriterlock);
{ atomic initialisation because BeginRead does not contain a memory barrier
before it modifies freadercount (with an atomic operation), so we have
to ensure that it sees this initial value }
System.InterlockedExchange(freadercount,0);
fwritelocked:=0;
fwriterequests:=0;
factivethreads:=0;
freaderqueue:=BasicEventCreate(nil,true,false,'');
{ synchronize initialization with later reads/writes }
ReadWriteBarrier;
@ -62,70 +78,151 @@ end;
destructor TMultiReadExclusiveWriteSynchronizer.Destroy;
var
p,q: PMREWThreadInfo;
i: integer;
begin
{ synchronize destruction with previous endread/write operations }
ReadWriteBarrier;
System.DoneCriticalSection(fwritelock);
RtlEventDestroy(fwaitingwriterlock);
BasicEventDestroy(freaderqueue);
{ Clean up thread info }
for i:=Low(fThreadList) to High(fThreadList) do
begin
q:=fThreadList[i];
fThreadList[i]:=nil;
while q<>nil do
begin
p:=q;
q:=q^.Next;
FreeMem(p);
end;
end;
end;
function TMultiReadExclusiveWriteSynchronizer.Beginwrite : boolean;
var
p: PMREWThreadInfo;
begin
{ wait for any other writers that may be in progress }
System.EnterCriticalSection(fwritelock);
{ it is possible that earlier on we missed waiting on the
fwaitingwriterlock and that it's still set (must be done
after acquiring the fwritelock, because otherwise one
writer could reset the fwaitingwriterlock of another one
that's about to wait on it) }
RTLeventResetEvent(fwaitingwriterlock);
{ new readers have to block from now on; writers get priority to avoid
writer starvation (since they have to compete with potentially many
concurrent readers and other writers) }
BasicEventResetEvent(freaderqueue);
{ Result indicates whether the protected memory was not modified,
that is, it is set to false if another writer could have chagned
memory}
Result:=True;
{ for quick checking by candidate-readers -- use interlockedincrement/
decrement instead of setting 1/0, because a single thread can
recursively acquire the write lock multiple times }
System.InterlockedIncrement(fwritelocked);
{ order increment vs freadercount check }
ReadWriteBarrier;
{ Count pending not granted requests so writes always take priority over
read requests}
System.InterlockedIncrement(fwriterequests);
{ wait until all readers are gone. The writer always first sets
fwritelocked and then checks freadercount, while the readers
always first increase freadercount and then check fwritelocked }
while freadercount<>0 do
RTLEventWaitFor(fwaitingwriterlock);
{ Get per thread counter }
p:=PMREWThreadInfo( GetThreadInfo(True) );
{ Make sure that out-of-order execution cannot already perform reads
inside the critical section before the lock has been acquired }
ReadBarrier;
if System.TryEnterCriticalSection(fwritelock)=0 then
begin
{ TryEnterCriticalSection failed, so not first in the write lock queue }
Result:=False;
{ we have the writer lock, and all readers are gone }
result:=true;
{ If we hold a read lock, then a deadlock will result.. This is because
the first thread in the write queue (holding fwritelock) does not have
mutexes write lock, as it must be waiting on this thread to release
its' read lock }
if p^.RefCount > 0 then
begin
System.InterlockedDecrement(fwriterequests);
raise TMREWException.Create('Deadlock detected');
end;
{ wait for any other writers that may be in progress }
System.EnterCriticalSection(fwritelock);
end;
{ Need to synchronize with readers only when this is the first
write request by this thread }
if (p^.RefCount and cWriteMask)=0 then
begin
{ Count active threads rather than readers. A write request can
be granted whenever the count has reduced to one }
if p^.RefCount=0 then
begin
{ flush increment of fwriterequests }
WriteBarrier;
System.InterlockedIncrement(factivethreads);
end;
{ new readers have to block from now on; writers get priority to avoid
writer starvation (since they have to compete with potentially many
concurrent readers and other writers) }
BasicEventResetEvent(freaderqueue);
{ it is possible that earlier on we missed waiting on the
fwaitingwriterlock and that it's still set (must be done
after acquiring the fwritelock, because otherwise one
writer could reset the fwaitingwriterlock of another one
that's about to wait on it) }
RTLeventResetEvent(fwaitingwriterlock);
{ wait until we are the only active thread. Get threadinfo
(needs to become a volatile read) }
while factivethreads>1 do
RTLEventWaitFor(fwaitingwriterlock);
{ Make sure that out-of-order execution cannot already perform reads
inside the critical section before the lock has been acquired }
ReadBarrier;
end;
{ Count write lock acquisitions by thread }
Inc(p^.RefCount,cNewWriter);
end;
procedure TMultiReadExclusiveWriteSynchronizer.Endwrite;
var
p: PMREWThreadInfo;
begin
{ Finish all writes inside the section so that everything executing
afterwards will certainly see those results }
WriteBarrier;
p:=PMREWThreadInfo( GetThreadInfo(False) );
{ signal potential readers that the coast is clear if all recursive
write locks have been freed }
if System.InterlockedDecrement(fwritelocked)=0 then
{ Protect against EndWrite called out of sequence blocking lock }
if (p<>nil) and
((p^.RefCount and cWriteMask)<>0) then
begin
{ wake up waiting readers (if any); do not check first whether freadercount
is <> 0, because the InterlockedDecrement in the while loop of BeginRead
can have already occurred, so a single reader may be about to wait on
freaderqueue even though freadercount=0. Setting an event multiple times
is no problem. }
BasicEventSetEvent(freaderqueue);
end;
{ free the writer lock so another writer can become active }
System.LeaveCriticalSection(fwritelock);
{ Update per thread counter before releasing next write thread }
Dec(p^.RefCount,cNewWriter);
{ Finish all writes inside the section, and the update of the RefCount,
so that everything executing afterwards will certainly see these
results }
WriteBarrier;
{ Reduce active thread count assuming it was not previously a read lock }
if p^.RefCount=0 then
begin
System.InterlockedDecrement(factivethreads);
{ order w.r.t. decrement of fwriterequests below }
WriteBarrier;
end;
{ signal potential readers that the coast is clear if all recursive
write locks have been freed }
{ Also test for pending write requests }
if System.InterlockedDecrement(fwriterequests)=0 then
begin
{ No more writers pending, wake any pending readers }
BasicEventSetEvent(freaderqueue);
end;
{ free the writer lock so another writer can become active }
System.LeaveCriticalSection(fwritelock);
{ Remove reference to thread if not in use any more }
if p^.RefCount=0 then
RemoveThread(p);
end
else
raise TMREWException.Create('EndWrite called before BeginWrite');
end;
@ -135,49 +232,166 @@ Const
wrTimeout = 1;
wrAbandoned= 2;
wrError = 3;
var
p: PMREWThreadInfo;
begin
System.InterlockedIncrement(freadercount);
{ order vs fwritelocked check }
ReadWriteBarrier;
{ wait until there is no more writer }
while fwritelocked<>0 do
{ Check if we already have a lock, if so grant immediate access }
p:=PMREWThreadInfo( GetThreadInfo(True) );
if p^.RefCount=0 then
begin
{ there's a writer busy or wanting to start -> wait until it's
finished; a writer may already be blocked in the mean time, so
wake it up if we're the last to go to sleep -- order frwritelocked
check above vs freadercount check/modification below }
ReadWriteBarrier;
if System.InterlockedDecrement(freadercount)=0 then
RTLEventSetEvent(fwaitingwriterlock);
if (BasicEventWaitFor(high(cardinal),freaderqueue) in [wrAbandoned,wrError]) then
raise Exception.create('BasicEventWaitFor failed in TMultiReadExclusiveWriteSynchronizer.Beginread');
{ and try again: first increase freadercount, only then check
fwritelocked }
System.InterlockedIncrement(freadercount);
{ order vs fwritelocked check at the start of the loop }
ReadWriteBarrier;
{ Wanted non-recursive read lock, so increase active threads }
System.InterlockedIncrement(factivethreads);
{ wait until there are no more writer active or pending }
ReadBarrier;
{ must make read volatile }
while fwriterequests<>0 do
begin
{ This thread is not active }
if System.InterlockedDecrement(factivethreads)<>0 then
RTLEventSetEvent(fwaitingwriterlock);
if (BasicEventWaitFor(high(cardinal),freaderqueue) in [wrAbandoned,wrError]) then
raise TMREWException.create('BasicEventWaitFor failed in TMultiReadExclusiveWriteSynchronizer.Beginread');
{ Try again to make this thread active }
System.InterlockedIncrement(factivethreads);
{ order w.r.t. reading fwriterequests }
ReadBarrier;
end;
{ Make sure that out-of-order execution cannot perform reads
inside the critical section before the lock has been acquired }
ReadBarrier;
end;
{ Count read lock acquisitions by thread: Inc(p^.RefCount,cNewReader) }
Inc(p^.RefCount);
end;
procedure TMultiReadExclusiveWriteSynchronizer.Endread;
var
p: PMREWThreadInfo;
begin
{ order freadercount decrement to all operations in the critical section
(otherwise, freadercount could become zero in another thread/cpu before
all reads in the protected section were committed, so a writelock could
become active and change things that we will still see) }
ReadWriteBarrier;
{ If no more readers, wake writer in the ready-queue if any. Since a writer
always first sets fwritelocked and then checks the freadercount (with a
barrier in between) first modifying freadercount and then checking fwritelock
ensures that we cannot miss one of the events regardless of execution
order. }
if System.InterlockedDecrement(freadercount)=0 then
p:=PMREWThreadInfo( GetThreadInfo(False) );
if (p<>nil) and
((p^.RefCount and cReadMask)<>0) then
begin
{ order fwritelocked check vs freadercount modification }
ReadWriteBarrier;
if fwritelocked<>0 then
RTLEventSetEvent(fwaitingwriterlock);
{ Update per thread counter: Dec(p^.RefCount,cNewReader) }
Dec(p^.RefCount);
{ if this is the last recursive call }
if p^.RefCount=0 then
begin
{ Thread no longer has an active lock }
{ If no more readers, wake writer in the ready-queue if any.
Every queued thread requesting a write lock increments fwriterequests,
and the first queued writer thread checks factivethreads is active
(will be set already during lock promotion) }
if System.InterlockedDecrement(factivethreads)=1 then
begin
{ order w.r.t. access to factivethreads }
ReadBarrier;
if fwriterequests>0 then
RTLEventSetEvent(fwaitingwriterlock);
end;
{ remove reference to this thread }
RemoveThread(p);
end;
end
else
raise TMREWException.Create('EndRead called before BeginRead');
end;
function TMultiReadExclusiveWriteSynchronizer.ThreadIDtoIndex(aThreadID: TThreadID): integer;
begin
Result:=
(
ptruint(aThreadID) xor (ptruint(aThreadID) shr 12)
{$ifdef cpu64}
xor (ptruint(aThreadID) shr 32) xor (ptruint(aThreadID) shr 36) xor (ptruint(aThreadID) shr 48)
{$endif}
) and $FFFF;
Result:=(Result xor (Result shr 4)) and $0F; // Return range 0..15
end;
function TMultiReadExclusiveWriteSynchronizer.GetThreadInfo(AutoCreate: Boolean): Pointer;
var
p: PMREWThreadInfo;
AThreadID: TThreadID;
FreeSlot: Boolean;
OldState: LongInt;
Index: integer;
begin
FreeSlot:=False;
AThreadID:=ThreadID;
Index:=ThreadIDtoIndex( AThreadID );
p:=PMREWThreadInfo(fThreadList[Index]);
while (p<>nil) and
(p^.ThreadID<>AThreadID) do
begin
if p^.Active=cAvail then // Is slot available for use
FreeSlot:=True; // Yes, remember in case we need it as this is a new thread
p:=p^.Next;
ReadBarrier;
end;
if p=nil then
begin
{ count threads with locks }
if FreeSlot then
begin
p:=fThreadList[Index];
while (p<>nil) do
begin
if p^.Active=cAvail then
begin
OldState:=InterlockedExchange( p^.Active, cInUse );
if OldState=cAvail then
begin
p^.ThreadID:=AThreadID; // Tag to thread
Break;
end;
end;
p:=p^.Next;
ReadBarrier;
end;
end;
if p=nil then
begin
p:=PMREWThreadInfo(AllocMem(SizeOf(TMREWThreadInfo)));
p^.ThreadID:=AThreadID;
p^.RefCount:=0;
p^.Active:=cInUse;
{ Now insert into the chain header }
p^.Next:=p;
WriteBarrier;
{ other threads will spin (loop) after the InterlockedExchange
until the field "next" is written, then this node is first in
the forward linked list }
p^.Next:=System.InterlockedExchange(fThreadList[Index],p);
end;
end;
Result:=p;
end;
procedure TMultiReadExclusiveWriteSynchronizer.RemoveThread(AThreadInfo: Pointer);
var
p: PMREWThreadInfo;
begin
p:=PMREWThreadInfo(AThreadInfo);
if p<>nil then
begin
{ Prevent matching during GetThreadInfo }
p^.ThreadID:=tthreadid(-1);
WriteBarrier;
{ Mark slot available }
p^.Active:=cAvail;
end;
end;
{$endif FPC_HAS_FEATURE_THREADING}

View File

@ -11,7 +11,9 @@ uses
var
lock: TMultiReadExclusiveWriteSynchronizer;
event1, event2: prtlevent;
gcount: longint;
gotdeadlockexception,
waiting: boolean;
type
@ -36,6 +38,49 @@ type
procedure execute; override;
end;
treadwritecounter = class(tcounter)
private
ftrywriteupgrade: boolean;
public
constructor create(trywriteupgrade: boolean);
procedure execute; override;
end;
tdeadlock1 = class(tthread)
procedure execute; override;
end;
tdeadlock2 = class(tthread)
procedure execute; override;
end;
tdoublereadonewrite1 = class(tthread)
procedure execute; override;
end;
tdoublereadonewrite2 = class(tthread)
procedure execute; override;
end;
twrongthreadendacquire = class(tthread)
ftestwrongreadrelease: boolean;
constructor create(testwrongreadrelease: boolean);
procedure execute; override;
end;
twrongthreadendrelease = class(tthread)
ftestwrongreadrelease: boolean;
constructor create(testwrongreadrelease: boolean);
procedure execute; override;
end;
tdoublewrite = class(tthread)
fsecondwritethread: boolean;
constructor create(secondwritethread: boolean);
procedure execute; override;
end;
constructor tcounter.create;
begin
{ create suspended }
@ -100,7 +145,199 @@ procedure twritecounter.execute;
sleep(r);
end;
end;
constructor treadwritecounter.create(trywriteupgrade: boolean);
begin
ftrywriteupgrade:=trywriteupgrade;
inherited create;
end;
procedure treadwritecounter.execute;
var
i: longint;
l: longint;
r: longint;
begin
for i:=1 to 100000 do
begin
lock.beginread;
if ftrywriteupgrade and
((i=50000) or
(random(10000)=0)) then
begin
inc(flocalcount);
lock.beginwrite;
l:=gcount;
{ guarantee at least one sleep }
if i=50000 then
sleep(20+random(30))
else if (random(5)=0) then
sleep(20);
lock.beginwrite;
gcount:=l+1;
lock.endwrite;
lock.endwrite;
end;
lock.endread;
r:=random(30000);
if (r=0) then
sleep(30);
end;
end;
procedure tdeadlock1.execute;
var
localgotdeadlockexception: boolean;
begin
localgotdeadlockexception:=false;
lock.beginread;
RTLEventSetEvent(event2);
RTLEventWaitFor(event1);
try
lock.beginwrite;
except
localgotdeadlockexception:=true;
gotdeadlockexception:=true;
end;
if not localgotdeadlockexception then
lock.endwrite;
lock.endread;
end;
procedure tdeadlock2.execute;
var
localgotdeadlockexception: boolean;
begin
localgotdeadlockexception:=false;
lock.beginread;
RTLEventSetEvent(event1);
RTLEventWaitFor(event2);
try
lock.beginwrite;
except
localgotdeadlockexception:=true;
gotdeadlockexception:=true;
end;
if not localgotdeadlockexception then
lock.endwrite;
lock.endread;
end;
procedure tdoublereadonewrite1.execute;
begin
// 1)
lock.beginread;
// 2)
RTLEventSetEvent(event2);
// 5)
RTLEventWaitFor(event1);
{ ensure tdoublereadonewrite2 has time to get stuck in beginwrite }
sleep(500);
// 6)
lock.beginread;
// 7)
lock.endread;
// 8)
lock.endread;
end;
procedure tdoublereadonewrite2.execute;
begin
// 3)
RTLEventWaitFor(event2);
// 4)
RTLEventSetEvent(event1);
// 4a -- block until after 8)
lock.beginwrite;
// 9)
lock.endwrite;
end;
constructor twrongthreadendacquire.create(testwrongreadrelease: boolean);
begin
ftestwrongreadrelease:=testwrongreadrelease;
inherited create(false);
end;
procedure twrongthreadendacquire.execute;
begin
if ftestwrongreadrelease then
lock.beginread
else
lock.beginwrite;
RTLEventSetEvent(event1);
RTLEventWaitFor(event2);
try
if ftestwrongreadrelease then
lock.endread
else
lock.endwrite;
except
halt(30);
end;
end;
constructor twrongthreadendrelease.create(testwrongreadrelease: boolean);
begin
ftestwrongreadrelease:=testwrongreadrelease;
inherited create(false);
end;
procedure twrongthreadendrelease.execute;
var
caught: boolean;
begin
RTLEventWaitFor(event1);
caught:=false;
try
if ftestwrongreadrelease then
lock.endread
else
lock.endwrite;
except
caught:=true;
end;
RTLEventSetEvent(event2);
if not caught then
halt(40);
end;
constructor tdoublewrite.create(secondwritethread: boolean);
begin
fsecondwritethread:=secondwritethread;
inherited create(false);
end;
procedure tdoublewrite.execute;
begin
if fsecondwritethread then
begin
RTLEventWaitFor(event1);
if lock.beginwrite then
halt(50);
end
else
begin
if not lock.beginwrite then
halt(51);
RTLEventSetEvent(event1);
// give the other thread the time to get to its beginwrite call
Sleep(500);
end;
lock.endwrite;
end;
procedure terrorcheck.execute;
@ -119,17 +356,66 @@ end;
var
r1,r2,r3,r4,r5,r6: treadcounter;
w1,w2,w3,w4: twritecounter;
rw1,rw2,rw3: treadwritecounter;
d1: tdeadlock1;
d2: tdeadlock2;
dr1: tdoublereadonewrite1;
dr2: tdoublereadonewrite2;
wr1: twrongthreadendacquire;
wr2: twrongthreadendrelease;
dw1, dw2: tdoublewrite;
caught: boolean;
begin
waiting:=false;
terrorcheck.create(false);
randomize;
lock:=TMultiReadExclusiveWriteSynchronizer.create;
event1:=RTLEventCreate;
event2:=RTLEventCreate;
{ verify that the lock is recursive }
lock.beginwrite;
lock.beginwrite;
if not lock.beginwrite then
halt(10);
if not lock.beginwrite then
halt(11);
lock.endwrite;
lock.endwrite;
{ verify that we can upgrade a read lock to a write lock }
lock.beginread;
if not lock.beginwrite then
halt(12);
lock.endwrite;
lock.endread;
{ verify that owning a write lock does not prevent getting a read lock }
if not lock.beginwrite then
halt(13);
lock.beginread;
lock.endread;
lock.endwrite;
{ verify that calling endread without beginread throws an exception }
caught:=false;
try
lock.endread;
except
caught:=true;
end;
if not caught then
halt(14);
{ verify that calling endwrite without beginwrite throws an exception }
caught:=false;
try
lock.endwrite;
except
caught:=true;
end;
if not caught then
halt(15);
{ first try some writers }
w1:=twritecounter.create;
w2:=twritecounter.create;
@ -201,6 +487,80 @@ begin
w1.free;
w2.free;
{ mixed readers and writers without proper synchronisation }
gcount:=0;
rw1:=treadwritecounter.create(true);
rw2:=treadwritecounter.create(false);
rw3:=treadwritecounter.create(false);
rw1.resume;
rw2.resume;
rw3.resume;
rw1.waitfor;
rw2.waitfor;
rw3.waitfor;
{ must not have caused any data races }
if (gcount<>rw1.localcount+rw2.localcount+rw3.localcount) then
begin
writeln('error 5');
halt(5);
end;
RTLEventResetEvent(event1);
RTLEventResetEvent(event2);
{ check deadlock detection }
d1:=tdeadlock1.create(false);
d2:=tdeadlock2.create(false);
d1.waitfor;
d2.waitfor;
if not gotdeadlockexception then
halt(6);
d1.free;
d2.free;
{ check that a waiting writer does not block a reader trying to get
a recursive read lock it already holds }
dr1:=tdoublereadonewrite1.create(false);
dr2:=tdoublereadonewrite2.create(false);
dr1.waitfor;
dr2.waitfor;
dr1.free;
dr2.free;
{ check that releasing a lock in another thread compared to where it
was acquired causes an exception }
wr1:=twrongthreadendacquire.create(true);
wr2:=twrongthreadendrelease.create(true);
wr1.waitfor;
wr2.waitfor;
wr1.free;
wr2.free;
wr1:=twrongthreadendacquire.create(false);
wr2:=twrongthreadendrelease.create(false);
wr1.waitfor;
wr2.waitfor;
wr1.free;
wr2.free;
dw1:=tdoublewrite.create(false);
dw2:=tdoublewrite.create(true);
dw1.waitfor;
dw2.waitfor;
dw1.free;
dw2.free;
RTLEventDestroy(event1);
RTLEventDestroy(event2);
lock.free;
while not waiting do