From 68c52cf02549c6d527b20f139f9233c823d0815f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C3=ABl=20Van=20Canneyt?= Date: Sun, 29 Jan 2023 21:26:24 +0100 Subject: [PATCH] * Refactored to be able to solve threadsafety problem --- .../fcl-db/src/sqldb/postgres/pqconnection.pp | 985 ++++++++++-------- 1 file changed, 566 insertions(+), 419 deletions(-) diff --git a/packages/fcl-db/src/sqldb/postgres/pqconnection.pp b/packages/fcl-db/src/sqldb/postgres/pqconnection.pp index a2567f7832..dbced449b7 100644 --- a/packages/fcl-db/src/sqldb/postgres/pqconnection.pp +++ b/packages/fcl-db/src/sqldb/postgres/pqconnection.pp @@ -31,18 +31,47 @@ uses type TPQCursor = Class; + TPQConnection = Class; - { TPQTrans } + { TPGHandle } + TCheckResultAction = (craClose,craClear); + TCheckResultActions = set of TCheckResultAction; - TPQTrans = Class(TSQLHandle) + TPGHandle = Class(TSQLHandle) + strict private + class var _HID : Int64; + private + FHandleID : Int64; + FConnected: Boolean; + FCOnnection: TPQConnection; + FDBName : String; + FActive : Boolean; + FUsed: Boolean; + function GetConnected: Boolean; protected - PGConn : PPGConn; - FList : TThreadList; + FNativeConn: PPGConn; + FCursorList : TThreadList; Procedure RegisterCursor(Cursor : TPQCursor); Procedure UnRegisterCursor(Cursor : TPQCursor); + procedure UnprepareStatement(Cursor: TPQCursor; Force: Boolean); + Procedure Connect; + Procedure Disconnect; + Procedure StartTransaction; + Procedure RollBack; + Procedure Commit; + Procedure Reset; + Function CheckConnectionStatus(doRaise : Boolean = True) : Boolean; + Function DescribePrepared(StmtName : String): PPGresult; + Function Exec(aSQL : String; aClearResult : Boolean; aError : String = '') : PPGresult; Public - Constructor Create; + Constructor Create(aConnection : TPQConnection;aDBName :string); Destructor Destroy; override; + procedure CheckResultError(var res: PPGresult; Actions : TCheckResultActions; const ErrMsg: string); + Property Connection : TPQConnection Read FCOnnection; + Property NativeConn : PPGConn Read FNativeConn; + Property Active : Boolean Read Factive; + Property Used : Boolean Read FUsed Write FUsed; + Property Connected : Boolean Read GetConnected; end; // TField and TFieldDef only support a limited amount of fields. @@ -60,19 +89,32 @@ type PFieldBinding = ^TFieldBinding; TFieldBindings = Array of TFieldBinding; + { TPQTransactionHandle } + + TPQTransactionHandle = Class(TSQLHandle) + strict private + FHandle: TPGHandle; + procedure SetHandle(AValue: TPGHandle); + Public + Property Handle : TPGHandle Read FHandle Write SetHandle; + end; + { TPQCursor } TPQCursor = Class(TSQLCursor) + private + procedure SetHandle(AValue: TPGhandle); protected Statement : string; StmtName : string; - tr : TPQTrans; + Fhandle : TPGHandle; res : PPGresult; CurTuple : integer; FieldBinding : TFieldBindings; Function GetFieldBinding(F : TFieldDef): PFieldBinding; Public Destructor Destroy; override; + Property Handle : TPGhandle Read FHandle Write SetHandle; end; { EPQDatabaseError } @@ -92,27 +134,18 @@ type CONSTRAINT_NAME: string; end; - { TPQTranConnection } - - TPQTranConnection = class - protected - FPGConn : PPGConn; - FTranActive : boolean; - end; { TPQConnection } TPQConnection = class (TSQLConnection) private - FConnectionPool : TThreadList; + FHandlePool : TThreadList; FCursorCount : dword; - FConnectString : string; FIntegerDateTimes : boolean; FVerboseErrors : Boolean; protected // Protected so they can be used by descendents. - procedure CheckConnectionStatus(var conn: PPGconn); - procedure CheckResultError(var res: PPGresult; conn:PPGconn; ErrMsg: string); + function GetConnectionString(const aDBName : String) : string; function TranslateFldType(res : PPGresult; Tuple : integer; out Size : integer; Out ATypeOID : oid) : TFieldType; procedure ExecuteDirectPG(const Query : String); Procedure GetExtendedFieldInfo(cursor: TPQCursor; Bindings : TFieldBindings); @@ -120,12 +153,10 @@ type procedure ApplyFieldUpdate(C : TSQLCursor; P: TSQLDBParam; F: TField; UseOldValue: Boolean); override; Function ErrorOnUnknownType : Boolean; // Add connection to pool. - procedure AddConnection(T: TPQTranConnection); - // Release connection in pool. - procedure ReleaseConnection(Conn: PPGConn; DoClear : Boolean); - + procedure AddHandle(T: TPGHandle); +{$IFNDEF VER3_2} function PortParamName: string; override; - +{$endif} procedure DoInternalConnect; override; procedure DoInternalDisconnect; override; function GetHandle : pointer; override; @@ -219,43 +250,128 @@ const Oid_Bool = 16; oid_numeric = 1700; Oid_uuid = 2950; +{ TPQTransactionHandle } -{ TPQTrans } - -constructor TPQTrans.Create; +procedure TPQTransactionHandle.SetHandle(AValue: TPGHandle); begin - FList:=TThreadList.Create; - FList.Duplicates:=dupIgnore; + if FHandle=AValue then Exit; + FHandle:=AValue; end; -destructor TPQTrans.Destroy; + +{ TPGHandle } + +constructor TPGHandle.Create(aConnection: TPQConnection; aDBName: string); +begin + FDBName:=aDBName; + FConnection:=aConnection; + FCursorList:=TThreadList.Create; + FCursorList.Duplicates:=dupIgnore; + FHandleID:=InterlockedIncrement64(_HID); + {$IFDEF PQDEBUG} + Writeln('>>> ',FHandleID,' [',TThread.CurrentThread.ThreadID, '] allocating handle '); + {$ENDIF} +end; + +destructor TPGHandle.Destroy; Var L : TList; I : integer; begin - L:=FList.LockList; + {$IFDEF PQDEBUG} + Writeln('>>> ',FHandleID,' [',TThread.CurrentThread.ThreadID, '] Destroying handle '); + {$ENDIF} + L:=FCursorList.LockList; try For I:=0 to L.Count-1 do - TPQCursor(L[i]).tr:=Nil; + TPQCursor(L[i]).Handle:=Nil; finally - FList.UnlockList; + FCursorList.UnlockList; end; - FreeAndNil(FList); + FreeAndNil(FCursorList); inherited Destroy; end; -procedure TPQTrans.RegisterCursor(Cursor: TPQCursor); +function TPGHandle.GetConnected: Boolean; begin - FList.Add(Cursor); - Cursor.tr:=Self; + Result:=FNativeConn<>Nil; end; -procedure TPQTrans.UnRegisterCursor(Cursor: TPQCursor); +procedure TPGHandle.RegisterCursor(Cursor: TPQCursor); begin - Cursor.tr:=Nil; - FList.Remove(Cursor); + if Cursor.handle=Self then + begin + {$IFDEF PQDEBUG} + Writeln('>>> ',FHandleID, ' [',TThread.CurrentThread.ThreadID, '] cursor ',PtrInt(Cursor),' already registered'); + {$ENDIF} + exit; + end; + {$IFDEF PQDEBUG} + Writeln('>>> ',FHandleID,' [',TThread.CurrentThread.ThreadID, '] registering cursor ',PtrInt(Cursor)); + {$ENDIF} + FCursorList.Add(Cursor); + Cursor.Handle:=Self; +end; + +procedure TPGHandle.UnRegisterCursor(Cursor: TPQCursor); + +Var + L : TList; + +begin + {$IFDEF PQDEBUG} + Writeln('>>> ',FHandleID,' [',TThread.CurrentThread.ThreadID, '] unregistering cursor ',PtrInt(Cursor)); + {$ENDIF} + Cursor.Handle:=Nil; + FCursorList.Remove(Cursor); + L:=FCursorList.LockList; + try + Used:=L.Count>0; + {$IFDEF PQDEBUG} + Writeln('>>> ',FHandleID,' [',TThread.CurrentThread.ThreadID, '] unregistering cursor ',PtrInt(Cursor),'. Handle still used: ',Used); + {$ENDIF} + finally + FCursorList.UnlockList; + end; +end; + +procedure TPGHandle.UnprepareStatement(Cursor: TPQCursor; Force : Boolean); +var + SQL : String; +begin + if Cursor.handle<>Self then + DatabaseError('Internal error: unpreparing in different transaction!'); + if Assigned(Cursor.res) then + PQclear(Cursor.res); + Cursor.res:=nil; + SQL:='deallocate '+Cursor.StmtName; + Cursor.StmtName:=''; + if Force then + Cursor.FPrepared := False; + if not Cursor.FPrepared then + Exit; + if (PQtransactionStatus(FNativeConn) <> PQTRANS_INERROR) then + begin + Exec(SQL,True,SErrUnPrepareFailed); + Cursor.FPrepared := False; + end; + UnregisterCursor(Cursor); +end; + +procedure TPGHandle.Connect; + +var + S : AnsiString; +begin + S:=Connection.GetConnectionString(FDBName); + FNativeConn:=PQconnectdb(PAnsiChar(S)); + CheckConnectionStatus; + FConnected:=True; + S:=Connection.CharSet; + if (S<>'') then + PQsetClientEncoding(FNativeConn,PAnsiChar(S)); end; @@ -263,11 +379,23 @@ end; destructor TPQCursor.Destroy; begin - if Assigned(tr) then - tr.UnRegisterCursor(Self); + if Assigned(Handle) then + Handle.UnRegisterCursor(Self); inherited Destroy; end; +procedure TPQCursor.SetHandle(AValue: TPGhandle); +begin + if FHandle=AValue then Exit; + if (FHandle<>Nil) and (aValue<>Nil) then + begin + {$IFDEF PQDEBUG} + writeln('>>> ',ptrint(Self),' [',TThread.CurrentThread.ThreadID, '] Setting handle while handle still valid'); + {$endif} + end; + FHandle:=AValue; +end; + function TPQCursor.GetFieldBinding(F: TFieldDef): PFieldBinding; Var @@ -301,14 +429,14 @@ begin FConnOptions := FConnOptions + [sqSupportParams, sqSupportEmptyDatabaseName, sqEscapeRepeat, sqEscapeSlash, sqImplicitTransaction,sqSupportReturning,sqSequences]; FieldNameQuoteChars:=DoubleQuotes; VerboseErrors:=True; - FConnectionPool:=TThreadlist.Create; + FHandlePool:=TThreadlist.Create; end; destructor TPQConnection.Destroy; begin // We must disconnect here. If it is done in inherited, then connection pool is gone. Connected:=False; - FreeAndNil(FConnectionPool); + FreeAndNil(FHandlePool); inherited destroy; end; @@ -326,32 +454,21 @@ end; procedure TPQConnection.ExecuteDirectPG(const Query: String); -var ASQLDatabaseHandle : PPGConn; - res : PPGresult; +var + AHandle : TPGHandle; begin CheckDisConnected; {$IfDef LinkDynamically} InitialisePostgres3; {$EndIf} - - FConnectString := ''; - if (UserName <> '') then FConnectString := FConnectString + ' user=''' + UserName + ''''; - if (Password <> '') then FConnectString := FConnectString + ' password=''' + Password + ''''; - if (HostName <> '') then FConnectString := FConnectString + ' host=''' + HostName + ''''; - FConnectString := FConnectString + ' dbname=''template1'''; - if (Params.Text <> '') then FConnectString := FConnectString + ' '+Params.Text; - - ASQLDatabaseHandle := PQconnectdb(pchar(FConnectString)); - - CheckConnectionStatus(ASQLDatabaseHandle); - - res := PQexec(ASQLDatabaseHandle,pchar(query)); - - CheckResultError(res,ASQLDatabaseHandle,SDBCreateDropFailed); - - PQclear(res); - PQFinish(ASQLDatabaseHandle); + aHandle:=TPGHandle.Create(Self,'template1'); + try + aHandle.Connect; + aHandle.Exec(Query,True,'Error executing query'); + finally + aHandle.Free; + end; {$IfDef LinkDynamically} ReleasePostgres3; {$EndIf} @@ -367,6 +484,7 @@ Var toid : oid; begin +s:=''; For I:=0 to Length(Bindings)-1 do if (Bindings[i].TypeOID>0) then begin @@ -376,10 +494,7 @@ begin end; if (S='') then exit; - S:='select oid,typname,typtype,typcategory from pg_type where oid in ('+S+') order by oid'; - Res:=PQExec(Cursor.tr.PGConn,PChar(S)); - if (PQresultStatus(res)<>PGRES_TUPLES_OK) then - CheckResultError(Res,Cursor.tr.PGConn,'Error getting type info'); + Res:=Cursor.Handle.Exec(S,False,'Error getting typeinfo'); try For I:=0 to PQntuples(Res)-1 do begin @@ -418,137 +533,76 @@ begin Result:=False; end; -procedure TPQConnection.AddConnection(T: TPQTranConnection); +procedure TPQConnection.AddHandle(T: TPGHandle); begin - FConnectionPool.Add(T); + FHandlePool.Add(T); end; -procedure TPQConnection.ReleaseConnection(Conn: PPGConn; DoClear: Boolean); - -Var - I : Integer; - L : TList; - T : TPQTranConnection; - -begin - L:=FConnectionPool.LockList; - // make connection available in pool - try - for i:=0 to L.Count-1 do - begin - T:=TPQTranConnection(L[i]); - if (T.FPGConn=Conn) then - begin - T.FTranActive:=false; - if DoClear then - T.FPGConn:=Nil; - break; - end; - end - finally - FConnectionPool.UnlockList; - end; -end; function TPQConnection.GetTransactionHandle(trans : TSQLHandle): pointer; begin - Result := trans; + if (trans is TPQTransactionHandle) then + Result:=Trans + else + DatabaseErrorFmt('Expected %s, got %s',[TPQTransactionHandle.ClassName,Trans.ClassName]); end; function TPQConnection.RollBack(trans : TSQLHandle) : boolean; + var - res : PPGresult; - tr : TPQTrans; - i : Integer; - L : TList; + tr : TPGHandle; begin - result := false; - tr := trans as TPQTrans; - // unprepare statements associated with given transaction - L:=tr.FList.LockList; - try - For I:=0 to L.Count-1 do - begin - UnprepareStatement(TPQCursor(L[i])); - TPQCursor(L[i]).tr:=Nil; - end; - L.Clear; - finally - tr.FList.UnlockList; - end; - - res := PQexec(tr.PGConn, 'ROLLBACK'); - CheckResultError(res,tr.PGConn,SErrRollbackFailed); - PQclear(res); - ReleaseConnection(tr.PGCOnn,false); + tr := (trans as TPQTransactionHandle).Handle as TPGHandle; + TR.RollBack; result := true; end; function TPQConnection.Commit(trans : TSQLHandle) : boolean; var - res : PPGresult; - tr : TPQTrans; + tr : TPGHandle; begin - result := false; - tr := trans as TPQTrans; - res := PQexec(tr.PGConn, 'COMMIT'); - CheckResultError(res,tr.PGConn,SErrCommitFailed); - PQclear(res); - //make connection available in pool - ReleaseConnection(tr.PGConn,false); - result := true; + tr := (trans as TPQTransactionHandle).Handle; + tr.Commit; + Result:=True; end; procedure TPQConnection.RollBackRetaining(trans : TSQLHandle); var - res : PPGresult; - tr : TPQTrans; + tr : TPGHandle; begin - tr := trans as TPQTrans; - res := PQexec(tr.PGConn, 'ROLLBACK'); - CheckResultError(res,tr.PGConn,SErrRollbackFailed); - - PQclear(res); - res := PQexec(tr.PGConn, 'BEGIN'); - CheckResultError(res,tr.PGConn,sErrTransactionFailed); - - PQclear(res); + tr := (trans as TPQTransactionHandle).Handle; + TR.RollBack; + TR.StartTransaction; end; procedure TPQConnection.CommitRetaining(trans : TSQLHandle); var - res : PPGresult; - tr : TPQTrans; + tr : TPGHandle; begin - tr := trans as TPQTrans; - res := PQexec(tr.PGConn, 'COMMIT'); - CheckResultError(res,tr.PGConn,SErrCommitFailed); - - PQclear(res); - res := PQexec(tr.PGConn, 'BEGIN'); - CheckResultError(res,tr.PGConn,sErrTransactionFailed); - - PQclear(res); + tr := (trans as TPQTransactionHandle).Handle as TPGHandle; + TR.Commit; + TR.StartTransaction; end; function TPQConnection.StartImplicitTransaction(trans : TSQLHandle; AParams : string) : boolean; var i : Integer; - T : TPQTranConnection; + T : TPGHandle; L : TList; + begin //find an unused connection in the pool i:=0; T:=Nil; - L:=FConnectionPool.LockList; + L:=FHandlePool.LockList; try while (i '' then - PQsetClientEncoding(T.FPGConn, pchar(CharSet)); + {$IFDEF PQDEBUG} + Writeln('>>> ',T.FHandleID,' [',TThread.CurrentThread.ThreadID, '] Reusing connection '); + {$ENDIF} + end; - TPQTrans(trans).PGConn := T.FPGConn; + if (Not T.Connected) then + T.Connect; + (Trans as TPQTransactionHandle).handle:=T; Result := true; end; @@ -585,98 +640,200 @@ function TPQConnection.StartDBTransaction(trans: TSQLHandle; AParams: string ): boolean; Var - res : PPGresult; - tr : TPQTrans; + tr : TPQTransactionHandle; begin Result:=StartImplicitTransaction(trans, AParams); if Result then begin - tr := trans as TPQTrans; - res := PQexec(tr.PGConn, 'BEGIN'); - CheckResultError(res,tr.PGConn,sErrTransactionFailed); - PQclear(res); + tr:= trans as TPQTransactionHandle; + tr.Handle.StartTransaction; end; end; procedure TPQConnection.DoInternalConnect; var - ASQLDatabaseHandle : PPGConn; - T : TPQTranConnection; + T : TPGHandle; begin {$IfDef LinkDynamically} InitialisePostgres3; {$EndIf} - inherited DoInternalConnect; - - FConnectString := ''; - if (UserName <> '') then FConnectString := FConnectString + ' user=''' + UserName + ''''; - if (Password <> '') then FConnectString := FConnectString + ' password=''' + Password + ''''; - if (HostName <> '') then FConnectString := FConnectString + ' host=''' + HostName + ''''; - if (DatabaseName <> '') then FConnectString := FConnectString + ' dbname=''' + DatabaseName + ''''; - if (Params.Text <> '') then FConnectString := FConnectString + ' '+Params.Text; - - ASQLDatabaseHandle := PQconnectdb(pchar(FConnectString)); + T:=TPGHandle.Create(Self,DatabaseName); try - CheckConnectionStatus(ASQLDatabaseHandle); + T.Connect; + T.Used:=false; + // This only works for pg>=8.0, so timestamps won't work with earlier versions of pg which are compiled with integer_datetimes on + if PQparameterStatus<>nil then + FIntegerDateTimes := PQparameterStatus(T.NativeConn,'integer_datetimes') = 'on'; except + T.Free; DoInternalDisconnect; raise; end; - - // This only works for pg>=8.0, so timestamps won't work with earlier versions of pg which are compiled with integer_datetimes on - if PQparameterStatus<>nil then - FIntegerDateTimes := PQparameterStatus(ASQLDatabaseHandle,'integer_datetimes') = 'on'; - T:=TPQTranConnection.Create; - T.FPGConn:=ASQLDatabaseHandle; - T.FTranActive:=false; - AddConnection(T); + AddHandle(T); end; procedure TPQConnection.DoInternalDisconnect; var i:integer; L : TList; - T : TPQTranConnection; + T : TPGHandle; begin Inherited; - L:=FConnectionPool.LockList; + L:=FHandlePool.LockList; try for i:=0 to L.Count-1 do begin - T:=TPQTranConnection(L[i]); - if assigned(T.FPGConn) then - PQfinish(T.FPGConn); + T:=TPGHandle(L[i]); + if T.Connected then + T.Disconnect; T.Free; end; L.Clear; finally - FConnectionPool.UnLockList; + FHandlePool.UnLockList; end; {$IfDef LinkDynamically} ReleasePostgres3; {$EndIf} end; -procedure TPQConnection.CheckConnectionStatus(var conn: PPGconn); -var sErr: string; +function TPQConnection.GetConnectionString(const aDBName : String): string; + + Procedure MaybeAdd(aName,aValue : String); + + begin + if aValue<>'' then + begin + if aName<>'' then + Result:=Result+' '+aName+'='''+aValue+'''' + else + Result:=result+' '+aValue; + end; + end; + begin - if (PQstatus(conn) = CONNECTION_BAD) then - begin - sErr := PQerrorMessage(conn); - //make connection available in pool - ReleaseConnection(Conn,True); - PQfinish(conn); - DatabaseError(sErrConnectionFailed + ' (PostgreSQL: ' + sErr + ')', Self); - end; + Result:=''; + MaybeAdd('user',UserName); + MaybeAdd('password',Password); + MaybeAdd('host',HostName); + MaybeAdd('dbname',aDBName); + MaybeAdd('',Params.Text); end; -procedure TPQConnection.CheckResultError(var res: PPGresult; conn: PPGconn; - ErrMsg: string); +procedure TPGHandle.Disconnect; + +Var + PG : PPGconn; + +begin + if FNativeConn=Nil then + DatabaseError('Not connected to Postgres Server'); + PG:=FNativeConn; + {$IFDEF PQDEBUG} + Writeln('>>> ',FHandleID,' [',TThread.CurrentThread.ThreadID, '] ,Finishing connection'); + {$ENDIF} + FNativeConn:=Nil; + PQFinish(PG); +end; + +procedure TPGHandle.StartTransaction; +begin + Exec('BEGIN',True,sErrTransactionFailed); + FActive:=True; +end; + +procedure TPGHandle.RollBack; + +Var + L : TList; + I : Integer; + C : TPQCursor; + +begin + if not Active then + Exit; + // unprepare statements associated with given transaction + L:=FCursorList.LockList; + try + For I:=0 to L.Count-1 do + begin + C:=TPQCursor(L[i]); + UnprepareStatement(C,False); + end; + L.Clear; + finally + FCursorList.UnlockList; + end; + FActive:=False; + Exec('ROLLBACK',True,SErrRollbackFailed); +end; + +procedure TPGHandle.Commit; + +begin + Exec('COMMIT',True,SErrCommitFailed); +end; + +procedure TPGHandle.Reset; +begin + {$IFDEF PQDEBUG} + Writeln('>>> ',FHandleID,' [',TThread.CurrentThread.ThreadID, '] : Resetting'); + {$ENDIF} + PQReset(FNativeConn); +end; + +function TPGHandle.CheckConnectionStatus(doRaise: Boolean): Boolean; + +var sErr: string; + +begin + Result:=False; + if (PQstatus(FNativeConn) <> CONNECTION_BAD) then + Exit(True); + sErr := PQerrorMessage(FNativeConn); + //make connection available in pool + Disconnect; + if DoRaise then + DatabaseError(sErrConnectionFailed + ' (PostgreSQL: ' + sErr + ')'); +end; + +function TPGHandle.DescribePrepared(StmtName: String): PPGresult; + +Var + S : AnsiString; + +begin + S:=StmtName; + Result:=PQdescribePrepared(FNativeConn,pchar(S)); +end; + +function TPGHandle.Exec(aSQL: String; aClearResult: Boolean; aError: String): PPGresult; + +Var + S : UTF8String; + Acts : TCheckResultActions; + +begin + if FNativeConn=Nil then + DatabaseError(IntToStr(FHandleID)+': No native PQ connection available'); + CheckConnectionStatus(); + S:=aSQL; + {$IFDEF PQDEBUG} + Writeln('>>> ',FHandleID,' [',TThread.CurrentThread.ThreadID, '] exec: ',S); + {$ENDIF} + Result:=PQexec(FNativeConn,PAnsiChar(S)); + acts:=[]; + if aClearResult then + include(acts,craClear); + CheckResultError(Result,acts,aError); +end; + +procedure TPGHandle.CheckResultError(var res: PPGresult; Actions: TCheckResultActions; const ErrMsg: string); Procedure MaybeAdd(Var S : String; Prefix,Msg : String); @@ -703,19 +860,26 @@ var CONSTRAINT_NAME: string; P : Pchar; haveError : Boolean; + lMessage : String; begin + lMessage:=ErrMsg; HaveError:=False; if (Res=Nil) then begin + {$IFDEF PQDEBUG} + Writeln('>>> ',FHandleID,' [',TThread.CurrentThread.ThreadID, '] nil result'); + {$ENDIF} HaveError:=True; - P:=PQerrorMessage(conn); + P:=PQerrorMessage(FNativeConn); If Assigned(p) then - ErrMsg:=StrPas(P); + lMessage:=lMessage+StrPas(P); + Reset; end - else if (PQresultStatus(res) <> PGRES_COMMAND_OK) then + else if Not (PQresultStatus(res) in [PGRES_COMMAND_OK,PGRES_TUPLES_OK]) then begin HaveError:=True; + {$IFNDEF VER3_2} SEVERITY:=PQresultErrorField(res,PG_DIAG_SEVERITY); SQLSTATE:=PQresultErrorField(res,PG_DIAG_SQLSTATE); MESSAGE_PRIMARY:=PQresultErrorField(res,PG_DIAG_MESSAGE_PRIMARY); @@ -727,9 +891,9 @@ begin COLUMN_NAME:=PQresultErrorField(res,PG_DIAG_COLUMN_NAME); DATATYPE_NAME:=PQresultErrorField(res,PG_DIAG_DATATYPE_NAME); CONSTRAINT_NAME:=PQresultErrorField(res,PG_DIAG_CONSTRAINT_NAME); - + {$ENDIF} sErr:=PQresultErrorMessage(res); - if VerboseErrors then + if Connection.VerboseErrors then begin MaybeAdd(sErr,'Severity',SEVERITY); MaybeAdd(sErr,'SQL State',SQLSTATE); @@ -746,8 +910,11 @@ begin end; if HaveError then begin - if (Self.Name='') then CompName := Self.ClassName else CompName := Self.Name; - E:=EPQDatabaseError.CreateFmt('%s : %s (PostgreSQL: %s)', [CompName, ErrMsg, sErr]); + if Assigned(Connection) then + CompName := Connection.Name; + if CompName='' then + CompName:=FDBName; + E:=EPQDatabaseError.CreateFmt('%s : %s (PostgreSQL: %s)', [CompName, lMessage, sErr]); E.SEVERITY:=SEVERITY; E.SQLSTATE:=SQLSTATE; E.MESSAGE_PRIMARY:=MESSAGE_PRIMARY; @@ -762,11 +929,11 @@ begin PQclear(res); res:=nil; - if assigned(conn) then - begin - PQFinish(conn); - ReleaseConnection(Conn,True); - end; + if craClose in Actions then + Disconnect; + {$IFDEF PQDEBUG} + Writeln('>>> ',IntToStr(FHandleID)+' [',TThread.CurrentThread.ThreadID, '] Error: ',lMessage,' - ',Serr); + {$ENDIF} raise E; end; end; @@ -867,7 +1034,7 @@ end; function TPQConnection.AllocateTransactionHandle: TSQLHandle; begin - result := TPQTrans.create; + result := TPQTransactionHandle.Create; end; procedure TPQConnection.PrepareStatement(cursor: TSQLCursor;ATransaction : TSQLTransaction;buf : string; AParams : TParams); @@ -913,7 +1080,9 @@ const TypeStrings : array[TFieldType] of string = 'Unknown', // ftTimeStamp 'numeric', // ftFMTBcd 'Unknown', // ftFixedWideChar - 'Unknown', // ftWideMemo + 'Unknown' // ftWideMemo + {$IFNDEF VER3_2} + , 'Unknown', // ftOraTimeStamp 'Unknown', // ftOraInterval 'Unknown', // ftLongWord @@ -921,6 +1090,7 @@ const TypeStrings : array[TFieldType] of string = 'Unknown', // ftByte 'Unknown', // ftExtended 'real' // ftSingle +{$ENDIF} ); @@ -929,108 +1099,94 @@ var i : integer; P : TParam; PQ : TSQLDBParam; + PQCurs : TPQCursor; begin - with (cursor as TPQCursor) do + PQCurs:=cursor as TPQCursor; + PQCurs.FPrepared := False; + PQCurs.FDirect := False; + // Prior to v8 there is no support for cursors and parameters. + // So that's not supported. + if PQCurs.FStatementType in [stInsert,stUpdate,stDelete, stSelect] then begin - FPrepared := False; - FDirect := False; - // Prior to v8 there is no support for cursors and parameters. - // So that's not supported. - if FStatementType in [stInsert,stUpdate,stDelete, stSelect] then - begin - StmtName := 'prepst'+inttostr(FCursorCount); - InterlockedIncrement(FCursorCount); - TPQTrans(aTransaction.Handle).RegisterCursor(Cursor as TPQCursor); + PQCurs.StmtName := 'prepst'+inttostr(FCursorCount); + InterlockedIncrement(FCursorCount); + if PQCurs.Handle=Nil then + (TObject(aTransaction.Handle) as TPQTransactionHandle).Handle.RegisterCursor(PQCurs); - // Only available for pq 8.0, so don't use it... - // Res := pqprepare(tr,'prepst'+name+nr,pchar(buf),params.Count,pchar('')); - s := 'prepare '+StmtName+' '; - if Assigned(AParams) and (AParams.Count > 0) then + // Only available for pq 8.0, so don't use it... + // Res := pqprepare(tr,'prepst'+name+nr,pchar(buf),params.Count,pchar('')); + s := 'prepare '+PQCurs.StmtName+' '; + if Assigned(AParams) and (AParams.Count > 0) then + begin + s := s + '('; + for i := 0 to AParams.Count-1 do begin - s := s + '('; - for i := 0 to AParams.Count-1 do + P:=AParams[i]; + If (P is TSQLDBParam) then + PQ:=TSQLDBParam(P) + else + PQ:=Nil; + TS:=TypeStrings[P.DataType]; + if (TS<>'Unknown') then begin - P:=AParams[i]; - If (P is TSQLDBParam) then - PQ:=TSQLDBParam(P) - else - PQ:=Nil; - TS:=TypeStrings[P.DataType]; - if (TS<>'Unknown') then + If Assigned(PQ) + and Assigned(PQ.SQLDBData) + and (PFieldBinding(PQ.SQLDBData)^.ExtendedFieldType=eftEnum) then + ts:='unknown'; + s := s + ts + ',' + end + else + begin + if P.DataType = ftUnknown then begin - If Assigned(PQ) - and Assigned(PQ.SQLDBData) - and (PFieldBinding(PQ.SQLDBData)^.ExtendedFieldType=eftEnum) then - ts:='unknown'; - s := s + ts + ',' + if P.IsNull then + s:=s+' unknown ,' + else + DatabaseErrorFmt(SUnknownParamFieldType,[P.Name],self) end else - begin - if P.DataType = ftUnknown then - begin - if P.IsNull then - s:=s+' unknown ,' - else - DatabaseErrorFmt(SUnknownParamFieldType,[P.Name],self) - end - else - DatabaseErrorFmt(SUnsupportedParameter,[Fieldtypenames[P.DataType]],self); - end; + DatabaseErrorFmt(SUnsupportedParameter,[Fieldtypenames[P.DataType]],self); end; - s[length(s)] := ')'; - buf := AParams.ParseSQL(buf,false,sqEscapeSlash in ConnOptions, sqEscapeRepeat in ConnOptions,psPostgreSQL); end; - s := s + ' as ' + buf; - if LogEvent(detActualSQL) then - Log(detActualSQL,S); - res := PQexec(tr.PGConn,pchar(s)); - CheckResultError(res,nil,SErrPrepareFailed); - // if statement is INSERT, UPDATE, DELETE with RETURNING clause, then - // override the statement type derrived by parsing the query. - if (FStatementType in [stInsert,stUpdate,stDelete]) and (pos('RETURNING', upcase(s)) > 0) then - begin - PQclear(res); - res := PQdescribePrepared(tr.PGConn,pchar(StmtName)); - if (PQresultStatus(res) = PGRES_COMMAND_OK) and (PQnfields(res) > 0) then - FStatementType := stSelect; - end; - FPrepared := True; - end - else - begin - if Assigned(AParams) then - Statement := AParams.ParseSQL(buf,false,sqEscapeSlash in ConnOptions, sqEscapeRepeat in ConnOptions,psPostgreSQL) - else - Statement:=Buf; - FDirect:=True; + s[length(s)] := ')'; + buf := AParams.ParseSQL(buf,false,sqEscapeSlash in ConnOptions, sqEscapeRepeat in ConnOptions,psPostgreSQL); end; + s := s + ' as ' + buf; + if LogEvent(detActualSQL) then + Log(detActualSQL,S); + PQCurs.Res:=PQCurs.Handle.Exec(S,False,SErrPrepareFailed); + // if statement is INSERT, UPDATE, DELETE with RETURNING clause, then + // override the statement type derived by parsing the query. + if (PQCurs.FStatementType in [stInsert,stUpdate,stDelete]) and (pos('RETURNING', upcase(s)) > 0) then + begin + PQclear(PQCurs.res); + PQCurs.res:=PQCurs.Handle.DescribePrepared(PQCurs.StmtName); + if (PQresultStatus(PQCurs.res) = PGRES_COMMAND_OK) and (PQnfields(PQCurs.res) > 0) then + PQCurs.FStatementType := stSelect; + end; + PQCurs.FPrepared := True; + end + else + begin + if Assigned(AParams) then + PQCurs.Statement := AParams.ParseSQL(buf,false,sqEscapeSlash in ConnOptions, sqEscapeRepeat in ConnOptions,psPostgreSQL) + else + PQCurs.Statement:=Buf; + PQCurs.FDirect:=True; end; + end; procedure TPQConnection.UnPrepareStatement(cursor : TSQLCursor); + +Var + C : TPQCursor; + begin - with (cursor as TPQCursor) do - begin - PQclear(res); - res:=nil; - if ForcedClose then - begin - FPrepared := False; - exit; - end; - if FPrepared then - begin - if assigned(tr) and (PQtransactionStatus(tr.PGConn) <> PQTRANS_INERROR) then - begin - res := PQexec(tr.PGConn,pchar('deallocate '+StmtName)); - CheckResultError(res,nil,SErrUnPrepareFailed); - PQclear(res); - res:=nil; - end; - FPrepared := False; - end; - end; + C:=Cursor as TPQCursor; + if Assigned(C.Handle) then + C.Handle.UnPrepareStatement(C,ForcedClose); end; procedure TPQConnection.Execute(cursor: TSQLCursor;atransaction:tSQLtransaction;AParams : TParams); @@ -1044,6 +1200,7 @@ var ar : array of PAnsiChar; ParamNames, ParamValues : array of string; cash: int64; + PQCurs : TPQCursor; function FormatTimeInterval(Time: TDateTime): string; // supports Time >= '24:00:00' var hour, minute, second, millisecond: word; @@ -1053,115 +1210,104 @@ var ar : array of PAnsiChar; end; begin - with cursor as TPQCursor do + ar:=[]; + ParamNames:=[]; + ParamValues:=[]; + Lengths:=[]; + Formats:=[]; + PQCurs:=cursor as TPQCursor; + PQCurs.CurTuple:=-1; + PQclear(PQCurs.res); + if PQCurs.FStatementType in [stInsert,stUpdate,stDelete,stSelect] then begin - CurTuple:=-1; - PQclear(res); - if FStatementType in [stInsert,stUpdate,stDelete,stSelect] then + if LogEvent(detParamValue) then + LogParams(AParams); + if Assigned(AParams) and (AParams.Count > 0) then begin - if LogEvent(detParamValue) then - LogParams(AParams); - if Assigned(AParams) and (AParams.Count > 0) then + l:=AParams.Count; + setlength(ar,l); + setlength(lengths,l); + setlength(formats,l); + for i := 0 to AParams.Count -1 do if not AParams[i].IsNull then begin - l:=AParams.Count; - setlength(ar,l); - setlength(lengths,l); - setlength(formats,l); - for i := 0 to AParams.Count -1 do if not AParams[i].IsNull then - begin - handled:=False; - case AParams[i].DataType of - ftDateTime: - s := FormatDateTime('yyyy"-"mm"-"dd hh":"nn":"ss.zzz', AParams[i].AsDateTime); - ftDate: - s := FormatDateTime('yyyy"-"mm"-"dd', AParams[i].AsDateTime); - ftTime: - s := FormatTimeInterval(AParams[i].AsDateTime); - ftFloat: - Str(AParams[i].AsFloat, s); - ftBCD: - Str(AParams[i].AsCurrency, s); - ftCurrency: - begin - cash:=NtoBE(round(AParams[i].AsCurrency*100)); - setlength(s, sizeof(cash)); - Move(cash, s[1], sizeof(cash)); - end; - ftFmtBCD: - s := BCDToStr(AParams[i].AsFMTBCD, FSQLFormatSettings); - ftBlob, ftGraphic, ftVarBytes: - begin - Handled:=true; - bd:= AParams[i].AsBlob; - l:=length(BD); - if l>0 then - begin - GetMem(ar[i],l+1); - ar[i][l]:=#0; - Move(BD[0],ar[i]^, L); - lengths[i]:=l; - end; - end - else - s := GetAsString(AParams[i]); - end; {case} - if not handled then + handled:=False; + case AParams[i].DataType of + ftDateTime: + s := FormatDateTime('yyyy"-"mm"-"dd hh":"nn":"ss.zzz', AParams[i].AsDateTime); + ftDate: + s := FormatDateTime('yyyy"-"mm"-"dd', AParams[i].AsDateTime); + ftTime: + s := FormatTimeInterval(AParams[i].AsDateTime); + ftFloat: + Str(AParams[i].AsFloat, s); + ftBCD: + Str(AParams[i].AsCurrency, s); + ftCurrency: begin - l:=length(s); - GetMem(ar[i],l+1); - StrMove(PAnsiChar(ar[i]), PAnsiChar(s), L+1); - lengths[i]:=L; + cash:=NtoBE(round(AParams[i].AsCurrency*100)); + setlength(s, sizeof(cash)); + Move(cash, s[1], sizeof(cash)); end; - if (AParams[i].DataType in [ftBlob,ftMemo,ftGraphic,ftCurrency,ftVarBytes]) then - Formats[i]:=1 + ftFmtBCD: + s := BCDToStr(AParams[i].AsFMTBCD, FSQLFormatSettings); + ftBlob, ftGraphic, ftVarBytes: + begin + Handled:=true; + bd:= AParams[i].AsBlob; + l:=length(BD); + if l>0 then + begin + GetMem(ar[i],l+1); + ar[i][l]:=#0; + Move(BD[0],ar[i]^, L); + lengths[i]:=l; + end; + end else - Formats[i]:=0; - end + s := GetAsString(AParams[i]); + end; {case} + if not handled then + begin + l:=length(s); + GetMem(ar[i],l+1); + StrMove(PAnsiChar(ar[i]), PAnsiChar(s), L+1); + lengths[i]:=L; + end; + if (AParams[i].DataType in [ftBlob,ftMemo,ftGraphic,ftCurrency,ftVarBytes]) then + Formats[i]:=1 else - FreeAndNil(ar[i]); - res := PQexecPrepared(tr.PGConn,pchar(StmtName),AParams.Count,@Ar[0],@Lengths[0],@Formats[0],1); - for i := 0 to AParams.Count -1 do - FreeMem(ar[i]); + Formats[i]:=0; end else - res := PQexecPrepared(tr.PGConn,pchar(StmtName),0,nil,nil,nil,1); + FreeAndNil(ar[i]); + PQCurs.res := PQexecPrepared(PQCurs.Handle.NativeConn,pchar(PQCurs.StmtName),AParams.Count,@Ar[0],@Lengths[0],@Formats[0],1); + for i := 0 to AParams.Count -1 do + FreeMem(ar[i]); end else + PQCurs.res := PQexecPrepared(PQCurs.Handle.NativeConn,pchar(PQCurs.StmtName),0,nil,nil,nil,1); + end + else + begin + if PQCurs.handle=Nil then + (TObject(aTransaction.Handle) as TPQTransactionHandle).Handle.RegisterCursor(PQCurs); + if Assigned(AParams) and (AParams.Count > 0) then begin - // RegisterCursor sets tr - TPQTrans(aTransaction.Handle).RegisterCursor(Cursor as TPQCursor); - - if Assigned(AParams) and (AParams.Count > 0) then + setlength(ParamNames,AParams.Count); + setlength(ParamValues,AParams.Count); + for i := 0 to AParams.Count -1 do begin - setlength(ParamNames,AParams.Count); - setlength(ParamValues,AParams.Count); - for i := 0 to AParams.Count -1 do - begin - ParamNames[AParams.Count-i-1] := '$'+inttostr(AParams[i].index+1); - ParamValues[AParams.Count-i-1] := GetAsSQLText(AParams[i]); - end; - s := stringsreplace(Statement,ParamNames,ParamValues,[rfReplaceAll]); - end - else - s := Statement; - res := PQexec(tr.PGConn,pchar(s)); - if (PQresultStatus(res) in [PGRES_COMMAND_OK]) then - begin - PQclear(res); - res:=nil; + ParamNames[AParams.Count-i-1] := '$'+inttostr(AParams[i].index+1); + ParamValues[AParams.Count-i-1] := GetAsSQLText(AParams[i]); end; - end; - - if assigned(res) and not (PQresultStatus(res) in [PGRES_COMMAND_OK,PGRES_TUPLES_OK]) then - begin - // Don't perform the rollback, only make it possible to do a rollback. - // The other databases also don't do this. - //atransaction.Rollback; - CheckResultError(res,nil,SErrExecuteFailed); - end; - - FSelectable := assigned(res) and (PQresultStatus(res)=PGRES_TUPLES_OK); + s := stringsreplace(PQCurs.Statement,ParamNames,ParamValues,[rfReplaceAll]); + end + else + s := PQCurs.Statement; + PQCurs.Res:=Nil; + PQCurs.Res:=PQCurs.Handle.Exec(S,False,SErrExecuteFailed); end; + PQCurs.FSelectable := assigned(PQCurs.res) and (PQresultStatus(PQCurs.res)=PGRES_TUPLES_OK); end; @@ -1233,38 +1379,37 @@ function TPQConnection.GetHandle: pointer; var i:integer; L : TList; - T : TPQTranConnection; + T : TPGHandle; begin result:=nil; if not Connected then exit; //Get any handle that is (still) connected - L:=FConnectionPool.LockList; + L:=FHandlePool.LockList; try I:=L.Count-1; While (I>=0) and (Result=Nil) do begin - T:=TPQTranConnection(L[i]); - if assigned(T.FPGConn) and (PQstatus(T.FPGConn)<>CONNECTION_BAD) then - Result:=T.FPGConn; + T:=TPGHandle(L[i]); + if T.Connected and T.CheckConnectionStatus(False) then + Result:=T; Dec(I); end; finally - FConnectionPool.UnLockList; + FHandlePool.UnLockList; end; if Result<>Nil then exit; //Nothing connected!! Reconnect // T is element 0 after loop - if assigned(T.FPGConn) then - PQreset(T.FPGConn) - else - T.FPGConn := PQconnectdb(pchar(FConnectString)); - CheckConnectionStatus(T.FPGConn); - if CharSet <> '' then - PQsetClientEncoding(T.FPGConn, pchar(CharSet)); - result:=T.FPGConn; + if Not Assigned(T) then + begin + T:=TPGHandle.Create(Self,DatabaseName); + AddHandle(T); + end; + T.Connect; + Result:=T; end; @@ -1469,10 +1614,12 @@ begin end; end; +{$IFNDEF VER3_2} function TPQConnection.PortParamName: string; begin Result := 'port'; end; +{$ENDIF} procedure TPQConnection.UpdateIndexDefs(IndexDefs : TIndexDefs;TableName : string);