* Thread-safe connection pool

git-svn-id: trunk@25248 -
This commit is contained in:
michael 2013-08-12 07:25:21 +00:00
parent d23a5b7c7d
commit ae19359a7c

View File

@ -55,11 +55,17 @@ type
FConnectString : string;
FIntegerDateTimes : boolean;
FVerboseErrors : Boolean;
FPool : TRTLCriticalSection;
procedure CheckConnectionStatus(var conn: PPGconn);
procedure CheckResultError(var res: PPGresult; conn:PPGconn; ErrMsg: string);
function TranslateFldType(res : PPGresult; Tuple : integer; out Size : integer) : TFieldType;
procedure ExecuteDirectPG(const Query : String);
protected
// Add connection to pool.
procedure AddConnection(T: TPQTranConnection);
// Release connection in pool.
procedure ReleaseConnection(Conn: PPGConn; DoClear : Boolean);
procedure DoInternalConnect; override;
procedure DoInternalDisconnect; override;
function GetHandle : pointer; override;
@ -86,6 +92,7 @@ type
function RowsAffected(cursor: TSQLCursor): TRowsCount; override;
public
constructor Create(AOwner : TComponent); override;
destructor destroy; override;
function GetConnectionInfo(InfoType:TConnInfoType): string; override;
procedure CreateDB; override;
procedure DropDB; override;
@ -160,6 +167,13 @@ begin
FConnOptions := FConnOptions + [sqSupportParams] + [sqEscapeRepeat] + [sqEscapeSlash];
FieldNameQuoteChars:=DoubleQuotes;
VerboseErrors:=True;
InitCriticalSection(FPool);
end;
destructor TPQConnection.destroy;
begin
DoneCriticalSection(FPool);
inherited destroy;
end;
procedure TPQConnection.CreateDB;
@ -174,7 +188,7 @@ begin
ExecuteDirectPG('DROP DATABASE ' +DatabaseName);
end;
procedure TPQConnection.ExecuteDirectPG(const query : string);
procedure TPQConnection.ExecuteDirectPG(const Query: String);
var ASQLDatabaseHandle : PPGConn;
res : PPGresult;
@ -207,6 +221,46 @@ begin
{$EndIf}
end;
procedure TPQConnection.AddConnection(T: TPQTranConnection);
Var
I : Integer;
begin
// make connection available in pool
EnterCriticalSection(FPool);
try
I:=Length(FConnectionPool);
SetLength(FConnectionPool,I+1);
FConnectionPool[i]:=T;
finally
EnterCriticalSection(FPool);
end;
end;
procedure TPQConnection.ReleaseConnection(Conn: PPGConn; DoClear: Boolean);
Var
I : Integer;
begin
// make connection available in pool
EnterCriticalSection(FPool);
try
for i:=0 to length(FConnectionPool)-1 do
if (FConnectionPool[i].FPGConn=Conn) then
begin
FConnectionPool[i].FTranActive:=false;
if DoClear then
FConnectionPool[i].FPGConn:=Nil;
break;
end;
finally
EnterCriticalSection(FPool);
end;
end;
function TPQConnection.GetTransactionHandle(trans : TSQLHandle): pointer;
begin
@ -228,13 +282,7 @@ begin
CheckResultError(res,tr.PGConn,SErrRollbackFailed);
PQclear(res);
//make connection available in pool
for i:=0 to length(FConnectionPool)-1 do
if FConnectionPool[i].FPGConn=tr.PGConn then
begin
FConnectionPool[i].FTranActive:=false;
break;
end;
ReleaseConnection(tr.PGCOnn,false);
result := true;
end;
@ -253,12 +301,7 @@ begin
PQclear(res);
//make connection available in pool
for i:=0 to length(FConnectionPool)-1 do
if FConnectionPool[i].FPGConn=tr.PGConn then
begin
FConnectionPool[i].FTranActive:=false;
break;
end;
ReleaseConnection(tr.PGConn,false);
result := true;
end;
@ -267,35 +310,45 @@ var
res : PPGresult;
tr : TPQTrans;
i : Integer;
t : TPQTranConnection;
begin
result:=false;
tr := trans as TPQTrans;
//find an unused connection in the pool
i:=0;
while i<length(FConnectionPool) do
if (FConnectionPool[i].FPGConn=nil) or not FConnectionPool[i].FTranActive then
break
else
EnterCriticalSection(FPool);
try
while i<length(FConnectionPool) do
begin
T:=FConnectionPool[i];
if (T.FPGConn=nil) or not T.FTranActive then
break
else
T:=Nil;
i:=i+1;
if i=length(FConnectionPool) then //create a new connection
end;
// set to active now, so when we exit critical section,
// it will be marked active and will not be found.
if Assigned(T) then
T.FTranActive:=true;
finally
LeaveCriticalSection(FPool);
end;
if (T=Nil) then
begin
T:=TPQTranConnection.Create;
T.FTranActive:=True;
AddConnection(T);
end;
if (T.FPGConn<>nil) then
tr.PGConn:=T.FPGConn
else
begin
tr.PGConn := PQconnectdb(pchar(FConnectString));
CheckConnectionStatus(tr.PGConn);
if CharSet <> '' then
PQsetClientEncoding(tr.PGConn, pchar(CharSet));
//store the new connection
SetLength(FConnectionPool,i+1);
FConnectionPool[i]:=TPQTranConnection.Create;
FConnectionPool[i].FPGConn:=tr.PGConn;
FConnectionPool[i].FTranActive:=true;
end
else //re-use existing connection
begin
tr.PGConn:=FConnectionPool[i].FPGConn;
FConnectionPool[i].FTranActive:=true;
end;
res := PQexec(tr.PGConn, 'BEGIN');
@ -339,7 +392,10 @@ end;
procedure TPQConnection.DoInternalConnect;
var ASQLDatabaseHandle : PPGConn;
var
ASQLDatabaseHandle : PPGConn;
T : TPQTranConnection;
begin
{$IfDef LinkDynamically}
InitialisePostgres3;
@ -365,24 +421,28 @@ begin
// This only works for pg>=8.0, so timestamps won't work with earlier versions of pg which are compiled with integer_datetimes on
if PQparameterStatus<>nil then
FIntegerDateTimes := PQparameterStatus(ASQLDatabaseHandle,'integer_datetimes') = 'on';
SetLength(FConnectionPool,1);
FConnectionPool[0]:=TPQTranConnection.Create;
FConnectionPool[0].FPGConn:=ASQLDatabaseHandle;
FConnectionPool[0].FTranActive:=false;
T:=TPQTranConnection.Create;
T.FPGConn:=ASQLDatabaseHandle;
T.FTranActive:=false;
AddConnection(T);
end;
procedure TPQConnection.DoInternalDisconnect;
var i:integer;
begin
Inherited;
for i:=0 to length(FConnectionPool)-1 do
begin
if assigned(FConnectionPool[i].FPGConn) then
PQfinish(FConnectionPool[i].FPGConn);
FConnectionPool[i].Free;
end;
Setlength(FConnectionPool,0);
EnterCriticalSection(FPool);
try
for i:=0 to length(FConnectionPool)-1 do
begin
if assigned(FConnectionPool[i].FPGConn) then
PQfinish(FConnectionPool[i].FPGConn);
FConnectionPool[i].Free;
end;
Setlength(FConnectionPool,0);
finally
LeaveCriticalSection(FPool);
end;
{$IfDef LinkDynamically}
ReleasePostgres3;
{$EndIf}
@ -396,13 +456,7 @@ begin
begin
sErr := PQerrorMessage(conn);
//make connection available in pool
for i:=0 to length(FConnectionPool)-1 do
if FConnectionPool[i].FPGConn=conn then
begin
FConnectionPool[i].FPGConn:=nil;
FConnectionPool[i].FTranActive:=false;
break;
end;
ReleaseConnection(Conn,True);
PQfinish(conn);
DatabaseError(sErrConnectionFailed + ' (PostgreSQL: ' + sErr + ')', Self);
end;
@ -463,14 +517,7 @@ begin
if assigned(conn) then
begin
PQFinish(conn);
//make connection available in pool
for i:=0 to length(FConnectionPool)-1 do
if FConnectionPool[i].FPGConn=conn then
begin
FConnectionPool[i].FPGConn:=nil;
FConnectionPool[i].FTranActive:=false;
break;
end;
ReleaseConnection(Conn,True);
end;
raise E;
end;
@ -549,18 +596,18 @@ begin
end;
end;
Function TPQConnection.AllocateCursorHandle : TSQLCursor;
function TPQConnection.AllocateCursorHandle: TSQLCursor;
begin
result := TPQCursor.create;
end;
Procedure TPQConnection.DeAllocateCursorHandle(var cursor : TSQLCursor);
procedure TPQConnection.DeAllocateCursorHandle(var cursor: TSQLCursor);
begin
FreeAndNil(cursor);
end;
Function TPQConnection.AllocateTransactionHandle : TSQLHandle;
function TPQConnection.AllocateTransactionHandle: TSQLHandle;
begin
result := TPQTrans.create;
@ -625,7 +672,7 @@ begin
if FStatementType in [stInsert,stUpdate,stDelete, stSelect] then
begin
StmtName := 'prepst'+inttostr(FCursorCount);
inc(FCursorCount);
InterlockedIncrement(FCursorCount);
tr := TPQTrans(aTransaction.Handle);
// Only available for pq 8.0, so don't use it...
// Res := pqprepare(tr,'prepst'+name+nr,pchar(buf),params.Count,pchar(''));