IDE: synchronize advancedipc.pp with FCL.

git-svn-id: trunk@50133 -
This commit is contained in:
ondrej 2015-10-21 08:24:05 +00:00
parent 250c06913b
commit ebbf7c90c3

View File

@ -19,7 +19,7 @@
**********************************************************************}
unit advancedipc;
unit AdvancedIPC;
{$mode objfpc}
{$H+}
@ -73,12 +73,13 @@ type
function RequestFileNameToID(const aFileName: string): Integer;
function RequestExists(const aRequestFileName: string): Boolean;
function GetUniqueRequest(out outFileName: string): Integer;
procedure SetServerID(const aServerID: string); virtual;
procedure SetGlobal(const aGlobal: Boolean); virtual;
function CanReadMessage(const aFileName: string; out outStream: TStream; out outMsgType: TMessageType; out outMsgLen: Integer): Boolean;
procedure DoPostMessage(const aFileName: string; const aMsgType: TMessageType; const aStream: TStream);
procedure DoPostMessage(const aFileName: string; const aMsgType: TMessageType; const aStream: TStream); overload;
procedure DoPostMessage(const aFileStream: TFileStream; const aMsgType: TMessageType; const aStream: TStream); overload;
function DoReadMessage(const aFileName: string; const aStream: TStream; out outMsgType: TMessageType): Boolean;
property FileName: string read FFileName;
public
@ -86,9 +87,9 @@ type
const outServerIDs: TStrings; const aGlobal: Boolean = False);
class function ServerRunning(const aServerID: string; const aGlobal: Boolean = False): Boolean; overload;
public
//ServerID: name/ID of the server. Use only ['a'..'z', 'A'..'Z', '_'] characters
//ServerID: name/ID of the server. Use only ['a'..'z', 'A'..'Z', '0'..'9', '_'] characters
property ServerID: string read FServerID write SetServerID;
//Global: if true, processes from different users can communicate; false, processes only from current users can communicate
//Global: if true, processes from different users can communicate; false, processes only from current user can communicate
property Global: Boolean read FGlobal write SetGlobal;
//MessageVersion: only messages with the same MessageVersion can be delivered between server/client
property MessageVersion: Integer read FMessageVersion write FMessageVersion;
@ -96,7 +97,12 @@ type
TIPCClient = class(TIPCBase)
private
FLastMsgFileName: string;
FLastRequestID: Integer;
function CreateUniqueRequest(out outFileStream: TFileStream): Integer;
function DoPeekResponse(const aResponseFileName: string; const aStream: TStream; out outMsgType: TMessageType; const aTimeOut: Integer): Boolean;
public
constructor Create(aOwner: TComponent); override;
public
//post request to server, do not wait until request is peeked; returns request ID
function PostRequest(const aMsgType: TMessageType; const aStream: TStream): Integer;
@ -104,9 +110,15 @@ type
function SendRequest(const aMsgType: TMessageType; const aStream: TStream; const aTimeOut: Integer): Boolean;
function SendRequest(const aMsgType: TMessageType; const aStream: TStream; const aTimeOut: Integer; out outRequestID: Integer): Boolean;
//peek a response from last request from this client
function PeekResponse(const aStream: TStream; out outMsgType: TMessageType; const aTimeOut: Integer): Boolean;
//delete last request from this client
procedure DeleteRequest;
function PeekResponse(const aStream: TStream; out outMsgType: TMessageType): Boolean; overload;
function PeekResponse(const aStream: TStream; out outMsgType: TMessageType; const aTimeOut: Integer): Boolean; overload;
//peek a response from request by ID
function PeekResponse(const aRequestID: Integer; const aStream: TStream; out outMsgType: TMessageType): Boolean; overload;
function PeekResponse(const aRequestID: Integer; const aStream: TStream; out outMsgType: TMessageType; const aTimeOut: Integer): Boolean; overload;
//delete last request from this client, returns true if request file existed and was deleted
function DeleteRequest: Boolean; overload;
//delete request by ID, returns true if request existed file and was deleted
function DeleteRequest(const aRequestID: Integer): Boolean; overload;
//check if server is running
function ServerRunning: Boolean; overload;
end;
@ -135,8 +147,8 @@ type
function PeekRequest(out outRequestID: Integer; out outMsgType: TMessageType; const aTimeOut: Integer): Boolean; overload;
//read a peeked request (that hasn't been read yet)
function ReadRequest(const aRequestID: Integer; const aStream: TStream): Boolean;
//delete a peeked request (that hasn't been read yet)
procedure DeleteRequest(const aRequestID: Integer);
//delete a peeked request (that hasn't been read yet), returns true if request file existed and was deleted
function DeleteRequest(const aRequestID: Integer): Boolean;
//post response to a request
procedure PostResponse(const aRequestID: Integer; const aMsgType: TMessageType; const aStream: TStream);
@ -167,6 +179,9 @@ resourcestring
implementation
type
TIPCSearchRec = {$IF FPC_FULLVERSION>=20701}TRawByteSearchRec{$ELSE}TSearchRec{$ENDIF};
const
{$IFDEF UNIX}
GLOBAL_RIGHTS = S_IRUSR or S_IWUSR or S_IRGRP or S_IWGRP or S_IROTH or S_IWOTH;
@ -174,6 +189,9 @@ const
GLOBAL_RIGHTS = 0;
{$ENDIF}
var
CreateUniqueRequestCritSec: TRTLCriticalSection;
{ TIPCBase }
function TIPCBase.CanReadMessage(const aFileName: string; out
@ -218,15 +236,27 @@ begin
outMsgLen := xHeader.MsgLen;
end;
function TIPCBase.GetUniqueRequest(out outFileName: string): Integer;
function TIPCBase.DoReadMessage(const aFileName: string;
const aStream: TStream; out outMsgType: TMessageType): Boolean;
var
xStream: TStream;
xMsgLen: Integer;
begin
Randomize;
repeat
//if Randomize/Random is started from 2 processes at exactly same moment, it returns the same number! -> prevent duplicates by xor GetCurrentThreadId
//the result must be of range 0..$7FFFFFFF (High(Integer))
Result := Integer((PtrInt(Random($7FFFFFFF)) xor {%H-}PtrInt(GetCurrentThreadId)) and $7FFFFFFF);
outFileName := GetRequestFileName(Result);
until not RequestExists(outFileName);
aStream.Size := 0;
xStream := nil;
try
Result := CanReadMessage(aFileName, xStream, outMsgType, xMsgLen);
if Result then
begin
if xMsgLen > 0 then
aStream.CopyFrom(xStream, xMsgLen);
FreeAndNil(xStream);
aStream.Position := 0;
DeleteFile(aFileName);
end;
finally
xStream.Free;
end;
end;
function TIPCBase.RequestExists(const aRequestFileName: string): Boolean;
@ -271,8 +301,20 @@ end;
procedure TIPCBase.DoPostMessage(const aFileName: string;
const aMsgType: TMessageType; const aStream: TStream);
var
xHeader: TMessageHeader;
xStream: TFileStream;
begin
xStream := TFileStream.Create(aFileName, fmCreate or fmShareExclusive, GLOBAL_RIGHTS);
try
DoPostMessage(xStream, aMsgType, aStream);
finally
xStream.Free;
end;
end;
procedure TIPCBase.DoPostMessage(const aFileStream: TFileStream;
const aMsgType: TMessageType; const aStream: TStream);
var
xHeader: TMessageHeader;
begin
xHeader.HeaderVersion := HEADER_VERSION;
xHeader.FileLock := 1;//locking
@ -283,18 +325,14 @@ begin
xHeader.MsgLen := 0;
xHeader.MsgVersion := MessageVersion;
xStream := TFileStream.Create(aFileName, fmCreate or fmShareExclusive, GLOBAL_RIGHTS);
try
xStream.WriteBuffer(xHeader, SizeOf(xHeader));
if Assigned(aStream) and (aStream.Size-aStream.Position > 0) then
xStream.CopyFrom(aStream, aStream.Size-aStream.Position);
aFileStream.WriteBuffer(xHeader, SizeOf(xHeader));
if Assigned(aStream) and (aStream.Size-aStream.Position > 0) then
aFileStream.CopyFrom(aStream, aStream.Size-aStream.Position);
xStream.Position := 0;//unlocking
xHeader.FileLock := 0;
xStream.WriteBuffer(xHeader, SizeOf(xHeader));
finally
xStream.Free;
end;
aFileStream.Position := 0;//unlocking
xHeader.FileLock := 0;
aFileStream.WriteBuffer(xHeader, SizeOf(xHeader));
aFileStream.Seek(0, soEnd);
end;
function TIPCBase.RequestFileNameToID(const aFileName: string): Integer;
@ -310,9 +348,9 @@ end;
class procedure TIPCBase.FindRunningServers(const aServerIDPrefix: string;
const outServerIDs: TStrings; const aGlobal: Boolean);
var
xRec: {$IF FPC_FULLVERSION>=20701}TRawByteSearchRec{$ELSE}TSearchRec{$ENDIF};
xRec: TIPCSearchRec;
begin
if FindFirst(ServerIDToFileName(aServerIDPrefix+'*', aGlobal), faAnyFile, xRec) = 0 then
if FindFirst(ServerIDToFileName(aServerIDPrefix+AllFilesMask, aGlobal), faAnyFile, xRec) = 0 then
begin
repeat
if (Pos('-', xRec.Name) = 0) and//file that we found is a pending message
@ -372,45 +410,107 @@ end;
{ TIPCClient }
procedure TIPCClient.DeleteRequest;
constructor TIPCClient.Create(aOwner: TComponent);
begin
if DeleteFile(FLastMsgFileName) then
FLastMsgFileName := '';
inherited Create(aOwner);
FLastRequestID := -1;
end;
function TIPCClient.PeekResponse(const aStream: TStream; out
outMsgType: TMessageType; const aTimeOut: Integer): Boolean;
function TIPCClient.DeleteRequest(const aRequestID: Integer): Boolean;
var
xRequestFileName: string;
begin
xRequestFileName := GetRequestFileName(aRequestID);
Result := DeleteFile(xRequestFileName);
if (aRequestID = FLastRequestID) and not FileExists(xRequestFileName) then
FLastRequestID := -1;
end;
function TIPCClient.DeleteRequest: Boolean;
begin
if FLastRequestID >= 0 then
Result := DeleteRequest(FLastRequestID)
else
Result := False;
end;
function TIPCClient.DoPeekResponse(const aResponseFileName: string;
const aStream: TStream; out outMsgType: TMessageType; const aTimeOut: Integer
): Boolean;
var
xStart: QWord;
xStream: TStream;
xMsgLen: Integer;
xFileResponse: string;
begin
aStream.Size := 0;
Result := False;
xStart := GetTickCount64;
repeat
xFileResponse := GetResponseFileName(FLastMsgFileName);
if CanReadMessage(xFileResponse, xStream, outMsgType, xMsgLen) then
begin
if xMsgLen > 0 then
aStream.CopyFrom(xStream, xMsgLen);
xStream.Free;
aStream.Position := 0;
DeleteFile(xFileResponse);
Exit(True);
end
if DoReadMessage(aResponseFileName, aStream, outMsgType) then
Exit(True)
else if aTimeOut > 20 then
Sleep(10);
until (GetTickCount64-xStart > aTimeOut);
end;
function TIPCClient.CreateUniqueRequest(out outFileStream: TFileStream): Integer;
var
xFileName: string;
begin
outFileStream := nil;
EnterCriticalsection(CreateUniqueRequestCritSec);
try
Randomize;
repeat
//if Randomize/Random is started from 2 processes at exactly same moment, it returns the same number! -> prevent duplicates by xor GetProcessId
//the result must be of range 0..$7FFFFFFF (High(Integer))
Result := Integer((PtrInt(Random($7FFFFFFF)) xor {%H-}PtrInt(GetProcessID)) and $7FFFFFFF);
xFileName := GetRequestFileName(Result);
until not RequestExists(xFileName);
outFileStream := TFileStream.Create(xFileName, fmCreate or fmShareExclusive, GLOBAL_RIGHTS);
finally
LeaveCriticalsection(CreateUniqueRequestCritSec);
end;
end;
function TIPCClient.PeekResponse(const aRequestID: Integer;
const aStream: TStream; out outMsgType: TMessageType): Boolean;
begin
Result := DoReadMessage(GetResponseFileName(aRequestID), aStream, outMsgType);
end;
function TIPCClient.PeekResponse(const aRequestID: Integer;
const aStream: TStream; out outMsgType: TMessageType; const aTimeOut: Integer
): Boolean;
begin
Result := DoPeekResponse(GetResponseFileName(aRequestID), aStream, outMsgType, aTimeOut);
end;
function TIPCClient.PeekResponse(const aStream: TStream; out
outMsgType: TMessageType): Boolean;
begin
Result := DoReadMessage(GetResponseFileName(FLastRequestID), aStream, outMsgType);
end;
function TIPCClient.PeekResponse(const aStream: TStream; out
outMsgType: TMessageType; const aTimeOut: Integer): Boolean;
begin
Result := DoPeekResponse(GetResponseFileName(FLastRequestID), aStream, outMsgType, aTimeOut);
end;
function TIPCClient.PostRequest(const aMsgType: TMessageType;
const aStream: TStream): Integer;
var
xRequestFileStream: TFileStream;
begin
Result := GetUniqueRequest(FLastMsgFileName);
DeleteFile(GetResponseFileName(FLastMsgFileName));//delete old response, if there is any
DoPostMessage(FLastMsgFileName, aMsgType, aStream);
xRequestFileStream := nil;
try
Result := CreateUniqueRequest(xRequestFileStream);
DoPostMessage(xRequestFileStream, aMsgType, aStream);
finally
xRequestFileStream.Free;
end;
FLastRequestID := Result;
end;
function TIPCClient.SendRequest(const aMsgType: TMessageType;
@ -459,11 +559,11 @@ end;
procedure TIPCServer.DeletePendingRequests;
var
xRec: {$IF FPC_FULLVERSION>=20701}TRawByteSearchRec{$ELSE}TSearchRec{$ENDIF};
xRec: TIPCSearchRec;
xDir: string;
begin
xDir := ExtractFilePath(FFileName);
if FindFirst(GetRequestPrefix+'*', faAnyFile, xRec) = 0 then
if FindFirst(GetRequestPrefix+AllFilesMask, faAnyFile, xRec) = 0 then
begin
repeat
DeleteFile(xDir+xRec.Name);
@ -472,9 +572,9 @@ begin
FindClose(xRec);
end;
procedure TIPCServer.DeleteRequest(const aRequestID: Integer);
function TIPCServer.DeleteRequest(const aRequestID: Integer): Boolean;
begin
DeleteFile(GetPeekedRequestFileName(aRequestID));
Result := DeleteFile(GetPeekedRequestFileName(aRequestID));
end;
constructor TIPCServer.Create(aOwner: TComponent);
@ -496,14 +596,14 @@ function TIPCServer.FindFirstRequest(out outFileName: string; out
outStream: TStream; out outMsgType: TMessageType; out outMsgLen: Integer
): Integer;
var
xRec: {$IF FPC_FULLVERSION>=20701}TRawByteSearchRec{$ELSE}TSearchRec{$ENDIF};
xRec: TIPCSearchRec;
begin
outFileName := '';
outStream := nil;
outMsgType := -1;
outMsgLen := 0;
Result := -1;
if FindFirst(GetRequestPrefix+'*', faAnyFile, xRec) = 0 then
if FindFirst(GetRequestPrefix+AllFilesMask, faAnyFile, xRec) = 0 then
begin
repeat
Result := RequestFileNameToID(xRec.Name);
@ -520,11 +620,11 @@ end;
function TIPCServer.FindHighestPendingRequestId: Integer;
var
xRec: {$IF FPC_FULLVERSION>=20701}TRawByteSearchRec{$ELSE}TSearchRec{$ENDIF};
xRec: TIPCSearchRec;
xRequestID: LongInt;
begin
Result := -1;
if FindFirst(GetRequestPrefix+'*', faAnyFile, xRec) = 0 then
if FindFirst(GetRequestPrefix+AllFilesMask, faAnyFile, xRec) = 0 then
begin
repeat
xRequestID := RequestFileNameToID(xRec.Name);
@ -537,10 +637,10 @@ end;
function TIPCServer.GetPendingRequestCount: Integer;
var
xRec: {$IF FPC_FULLVERSION>=20701}TRawByteSearchRec{$ELSE}TSearchRec{$ENDIF};
xRec: TIPCSearchRec;
begin
Result := 0;
if FindFirst(GetRequestPrefix+'*', faAnyFile, xRec) = 0 then
if FindFirst(GetRequestPrefix+AllFilesMask, faAnyFile, xRec) = 0 then
begin
repeat
if RequestFileNameToID(xRec.Name) >= 0 then
@ -559,12 +659,17 @@ var
begin
outMsgType := -1;
xMsgFileName := '';
outRequestID := FindFirstRequest(xMsgFileName, xStream, outMsgType, xMsgLen);
Result := outRequestID >= 0;
if Result then
begin
xStream := nil;
try
outRequestID := FindFirstRequest(xMsgFileName, xStream, outMsgType, xMsgLen);
Result := outRequestID >= 0;
if Result then
begin
FreeAndNil(xStream);
RenameFile(xMsgFileName, GetPeekedRequestFileName(xMsgFileName));
end;
finally
xStream.Free;
RenameFile(xMsgFileName, GetPeekedRequestFileName(xMsgFileName));
end;
end;
@ -623,22 +728,9 @@ end;
function TIPCServer.ReadRequest(const aRequestID: Integer; const aStream: TStream
): Boolean;
var
xStream: TStream;
xMsgLen: Integer;
xMsgType: TMessageType;
xFileRequest: string;
begin
aStream.Size := 0;
xFileRequest := GetPeekedRequestFileName(aRequestID);
Result := CanReadMessage(xFileRequest, xStream, xMsgType, xMsgLen);
if Result then
begin
if xMsgLen > 0 then
aStream.CopyFrom(xStream, xMsgLen);
xStream.Free;
aStream.Position := 0;
DeleteFile(xFileRequest);
end;
Result := DoReadMessage(GetPeekedRequestFileName(aRequestID), aStream, xMsgType);
end;
procedure TIPCServer.SetGlobal(const aGlobal: Boolean);
@ -684,4 +776,10 @@ begin
FActive := False;
end;
initialization
InitCriticalSection(CreateUniqueRequestCritSec);
finalization
DoneCriticalsection(CreateUniqueRequestCritSec);
end.