* Patch from Denis Kozlov: Patches here fix several issues with SimpleIPC on Windows. (Bug ID 29346)

Summary of changes:
1. Using SetTimer + GetMessage method for handling PeekMessage.
2. Store all received incoming messages in a message queue (per IPC server instance).
3. Implemented message queue (PeekMessage receives messages, ReadMessage picks from the queue)
4. Implemented optional message queue limit and overflow handling (more on that below).
5. Timeout<-1 is forced to -1; Documented meaning of various Timeout values in comments.
6. Minor refactoring and code formatting changes.

git-svn-id: trunk@32859 -
This commit is contained in:
michael 2016-01-05 20:41:08 +00:00
parent b9da082e29
commit 072b473f9f
3 changed files with 408 additions and 128 deletions

View File

@ -37,6 +37,7 @@ begin
P.Dependencies.add('morphunits',[morphos]);
P.Dependencies.add('arosunits',[aros]);
P.Dependencies.add('amunits',[amiga]);
P.Dependencies.add('fcl-base');
T:=P.Targets.AddUnit('pipes.pp');
T.Dependencies.AddInclude('pipes.inc');

View File

@ -28,7 +28,15 @@ Const
//Message types
mtUnknown = 0;
mtString = 1;
type
TIPCMessageOverflowAction = (ipcmoaNone, ipcmoaDiscardOld, ipcmoaDiscardNew, ipcmoaError);
var
// Currently implemented only for Windows platform!
DefaultIPCMessageOverflowAction: TIPCMessageOverflowAction = ipcmoaNone;
DefaultIPCMessageQueueLimit: Integer = 0;
Type
TMessageType = LongInt;
@ -48,7 +56,7 @@ Type
FOwner : TSimpleIPCServer;
Protected
Function GetInstanceID : String; virtual; abstract;
Procedure DoError(Msg : String; Args : Array of const);
Procedure DoError(const Msg : String; const Args : Array of const);
Procedure SetMsgType(AMsgType: TMessageType);
Function MsgData : TStream;
Public
@ -71,7 +79,7 @@ Type
FBusy: Boolean;
FActive : Boolean;
FServerID : String;
Procedure DoError(Msg : String; Args : Array of const);
Procedure DoError(const Msg: String; const Args: array of const);
Procedure CheckInactive;
Procedure CheckActive;
Procedure Activate; virtual; abstract;
@ -99,13 +107,13 @@ Type
Function CommClass : TIPCServerCommClass; virtual;
Procedure Activate; override;
Procedure Deactivate; override;
Procedure ReadMessage;
Public
Constructor Create(AOwner : TComponent); override;
Destructor Destroy; override;
Procedure StartServer;
Procedure StopServer;
Function PeekMessage(TimeOut : Integer; DoReadMessage : Boolean): Boolean;
Procedure ReadMessage;
Property StringMessage : String Read GetStringMessage;
Procedure GetMessageData(Stream : TStream);
Property MsgType: TMessageType Read FMsgType;
@ -122,7 +130,7 @@ Type
private
FOwner: TSimpleIPCClient;
protected
Procedure DoError(Msg : String; Args : Array of const);
Procedure DoError(const Msg : String; const Args : Array of const);
Public
Constructor Create(AOwner : TSimpleIPCClient); virtual;
Property Owner : TSimpleIPCClient read FOwner;
@ -195,7 +203,7 @@ begin
FOwner:=AOWner;
end;
Procedure TIPCServerComm.DoError(Msg : String; Args : Array of const);
Procedure TIPCServerComm.DoError(const Msg : String; const Args : Array of const);
begin
FOwner.DoError(Msg,Args);
@ -222,7 +230,7 @@ begin
FOwner:=AOwner;
end;
Procedure TIPCClientComm.DoError(Msg : String; Args : Array of const);
Procedure TIPCClientComm.DoError(const Msg : String; const Args : Array of const);
begin
FOwner.DoError(Msg,Args);
@ -232,9 +240,15 @@ end;
TSimpleIPC
---------------------------------------------------------------------}
procedure TSimpleIPC.DoError(Msg: String; Args: array of const);
Procedure TSimpleIPC.DoError(const Msg: String; const Args: array of const);
var
FullMsg: String;
begin
Raise EIPCError.Create(Name+': '+Format(Msg,Args));
if Length(Name) > 0
then FullMsg := Name + ': '
else FullMsg := '';
FullMsg := FullMsg + Format(Msg, Args);
raise EIPCError.Create(FullMsg);
end;
procedure TSimpleIPC.CheckInactive;
@ -351,10 +365,16 @@ begin
FActive:=False;
end;
function TSimpleIPCServer.PeekMessage(TimeOut: Integer; DoReadMessage: Boolean
): Boolean;
// TimeOut values:
// > 0 -- number of milliseconds to wait
// = 0 -- return immediately
// = -1 -- wait infinitely
// < -1 -- wait infinitely (force to -1)
function TSimpleIPCServer.PeekMessage(TimeOut: Integer; DoReadMessage: Boolean): Boolean;
begin
CheckActive;
if TimeOut < -1 then
TimeOut := -1;
FBusy:=True;
Try
Result:=FIPCComm.PeekMessage(Timeout);

View File

@ -14,19 +14,20 @@
**********************************************************************}
uses Windows,messages;
uses Windows,messages,contnrs;
Const
MsgWndClassName : pchar = 'FPCMsgWindowCls';
const
MsgWndClassName: PChar = 'FPCMsgWindowCls';
Resourcestring
resourcestring
SErrFailedToRegisterWindowClass = 'Failed to register message window class';
SErrFailedToCreateWindow = 'Failed to create message window %s';
SErrMessageQueueOverflow = 'Message queue overflow (limit %s)';
var
MsgWindowClass: TWndClassA = (
style: 0;
lpfnWndProc: Nil;
lpfnWndProc: nil;
cbClsExtra: 0;
cbWndExtra: 0;
hInstance: 0;
@ -34,22 +35,60 @@ var
hCursor: 0;
hbrBackground: 0;
lpszMenuName: nil;
lpszClassName: Nil);
{ ---------------------------------------------------------------------
TWinMsgServerComm
---------------------------------------------------------------------}
lpszClassName: nil);
type
TWinMsgServerMsg = class
strict private
FStream: TStream;
FMsgType: TMessageType;
public
constructor Create;
destructor Destroy; override;
property Stream: TStream read FStream;
property MsgType: TMessageType read FMsgType write FMsgType;
end;
TWinMsgServerMsgQueue = class
strict private
FList: TFPObjectList;
FMaxCount: Integer;
FMaxAction: TIPCMessageOverflowAction;
function GetCount: Integer;
procedure DeleteAndFree(Index: Integer);
function PrepareToPush: Boolean;
public
constructor Create;
destructor Destroy; override;
procedure Clear;
procedure Push(AItem: TWinMsgServerMsg);
function Pop: TWinMsgServerMsg;
property Count: Integer read GetCount;
property MaxCount: Integer read FMaxCount write FMaxCount;
property MaxAction: TIPCMessageOverflowAction read FMaxAction write FMaxAction;
end;
Type
TWinMsgServerComm = Class(TIPCServerComm)
Private
strict private
FHWND : HWND;
FWindowName : String;
FDataPushed : Boolean;
FUnction AllocateHWnd(Const aWindowName : String) : HWND;
Public
Constructor Create(AOWner : TSimpleIPCServer); override;
FWndProcException: Boolean;
FWndProcExceptionMsg: String;
FMsgQueue: TWinMsgServerMsgQueue;
function AllocateHWnd(const aWindowName : String) : HWND;
procedure ProcessMessages;
procedure ProcessMessagesWait(TimeOut: Integer);
procedure HandlePostedMessage(const Msg: TMsg); inline;
function HaveQueuedMessages: Boolean; inline;
function CountQueuedMessages: Integer; inline;
procedure CheckWndProcException; inline;
private
procedure ReadMsgData(var Msg: TMsg);
function TryReadMsgData(var Msg: TMsg; out Error: String): Boolean;
procedure SetWndProcException(const ErrorMsg: String); inline;
public
constructor Create(AOwner : TSimpleIPCServer); override;
destructor Destroy; override;
Procedure StartServer; override;
Procedure StopServer; override;
Function PeekMessage(TimeOut : Integer) : Boolean; override;
@ -58,41 +97,143 @@ Type
Property WindowName : String Read FWindowName;
end;
{ ---------------------------------------------------------------------
TWinMsgServerMsg / TWinMsgServerMsgQueue
---------------------------------------------------------------------}
function MsgWndProc(HWindow: HWnd; Message, WParam, LParam: Longint): Longint;stdcall;
Var
I : TWinMsgServerComm;
Msg : TMsg;
constructor TWinMsgServerMsg.Create;
begin
Result:=0;
If (Message=WM_COPYDATA) then
begin
I:=TWinMsgServerComm(GetWindowLongPtr(HWindow,GWL_USERDATA));
If (I<>NIl) then
begin
Msg.Message:=Message;
Msg.WParam:=WParam;
Msg.LParam:=LParam;
I.ReadMsgData(Msg);
I.FDataPushed:=True;
If Assigned(I.Owner.OnMessage) then
I.Owner.ReadMessage;
Result:=1;
end
end
else
Result:=DefWindowProc(HWindow,Message,WParam,LParam);
FMsgType := 0;
FStream := TMemoryStream.Create;
end;
destructor TWinMsgServerMsg.Destroy;
begin
FStream.Free;
end;
function TWinMsgServerComm.AllocateHWnd(const aWindowName: String): HWND;
constructor TWinMsgServerMsgQueue.Create;
begin
FMaxCount := DefaultIPCMessageQueueLimit;
FMaxAction := DefaultIPCMessageOverflowAction;
FList := TFPObjectList.Create(False); // FreeObjects = False!
end;
destructor TWinMsgServerMsgQueue.Destroy;
begin
Clear;
FList.Free;
end;
procedure TWinMsgServerMsgQueue.Clear;
begin
while FList.Count > 0 do
DeleteAndFree(FList.Count - 1);
end;
procedure TWinMsgServerMsgQueue.DeleteAndFree(Index: Integer);
begin
FList[Index].Free; // Free objects manually!
FList.Delete(Index);
end;
function TWinMsgServerMsgQueue.GetCount: Integer;
begin
Result := FList.Count;
end;
function TWinMsgServerMsgQueue.PrepareToPush: Boolean;
begin
Result := True;
case FMaxAction of
ipcmoaDiscardOld:
begin
while (FList.Count >= FMaxCount) do
DeleteAndFree(FList.Count - 1);
end;
ipcmoaDiscardNew:
begin
Result := (FList.Count < FMaxCount);
end;
ipcmoaError:
begin
if (FList.Count >= FMaxCount) then
// Caller is expected to catch this exception, so not using Owner.DoError()
raise EIPCError.CreateFmt(SErrMessageQueueOverflow, [IntToStr(FMaxCount)]);
end;
end;
end;
procedure TWinMsgServerMsgQueue.Push(AItem: TWinMsgServerMsg);
begin
if PrepareToPush then
FList.Insert(0, AItem);
end;
function TWinMsgServerMsgQueue.Pop: TWinMsgServerMsg;
var
Index: Integer;
begin
Index := FList.Count - 1;
if Index >= 0 then
begin
// Caller is responsible for freeing the object.
Result := TWinMsgServerMsg(FList[Index]);
FList.Delete(Index);
end
else
Result := nil;
end;
{ ---------------------------------------------------------------------
MsgWndProc
---------------------------------------------------------------------}
function MsgWndProc(Window: HWND; uMsg: UINT; wParam: WPARAM; lParam: LPARAM): LRESULT; stdcall;
Var
Server: TWinMsgServerComm;
Msg: TMsg;
MsgError: String;
begin
Result:=0;
if (uMsg=WM_COPYDATA) then
begin
// Post WM_USER to wake up GetMessage call.
PostMessage(Window, WM_USER, 0, 0);
// Read message data and add to message queue.
Server:=TWinMsgServerComm(GetWindowLongPtr(Window,GWL_USERDATA));
if Assigned(Server) then
begin
Msg.Message:=uMsg;
Msg.wParam:=wParam;
Msg.lParam:=lParam;
// Exceptions thrown inside WindowProc may not propagate back
// to the caller in some circumstances (according to MSDN),
// so capture it and raise it outside of WindowProc!
if Server.TryReadMsgData(Msg, MsgError) then
Result:=1 // True
else
begin
Result:=0; // False
Server.SetWndProcException(MsgError);
end;
end;
end
else
begin
Result:=DefWindowProc(Window,uMsg,wParam,lParam);
end;
end;
{ ---------------------------------------------------------------------
TWinMsgServerComm
---------------------------------------------------------------------}
function TWinMsgServerComm.AllocateHWnd(const aWindowName: String): HWND;
var
cls: TWndClassA;
isreg : Boolean;
begin
Pointer(MsgWindowClass.lpfnWndProc):=@MsgWndProc;
MsgWindowClass.hInstance := HInstance;
@ -108,84 +249,198 @@ begin
SetWindowLongPtr(Result,GWL_USERDATA,PtrInt(Self));
end;
constructor TWinMsgServerComm.Create(AOWner: TSimpleIPCServer);
constructor TWinMsgServerComm.Create(AOwner: TSimpleIPCServer);
begin
inherited Create(AOWner);
FWindowName:=Owner.ServerID;
inherited Create(AOwner);
FWindowName := Owner.ServerID;
If not Owner.Global then
FWindowName:=FWindowName+'_'+InstanceID;
FWindowName := FWindowName+'_'+InstanceID;
FWndProcException := False;
FWndProcExceptionMsg := '';
FMsgQueue := TWinMsgServerMsgQueue.Create;
end;
destructor TWinMsgServerComm.Destroy;
begin
StopServer;
FMsgQueue.Free;
inherited;
end;
procedure TWinMsgServerComm.StartServer;
begin
FHWND:=AllocateHWND(FWindowName);
StopServer;
FHWND := AllocateHWND(FWindowName);
end;
procedure TWinMsgServerComm.StopServer;
begin
DestroyWindow(FHWND);
FHWND:=0;
FMsgQueue.Clear;
if FHWND <> 0 then
begin
DestroyWindow(FHWND);
FHWND := 0;
end;
end;
procedure TWinMsgServerComm.SetWndProcException(const ErrorMsg: String); inline;
begin
FWndProcException := True;
FWndProcExceptionMsg := ErrorMsg;
end;
procedure TWinMsgServerComm.CheckWndProcException; inline;
var
Msg: String;
begin
if FWndProcException then
begin
Msg := FWndProcExceptionMsg;
FWndProcException := False;
FWndProcExceptionMsg := '';
Owner.DoError(Msg, []);
end;
end;
function TWinMsgServerComm.HaveQueuedMessages: Boolean; inline;
begin
Result := (FMsgQueue.Count > 0);
end;
function TWinMsgServerComm.CountQueuedMessages: Integer; inline;
begin
Result := FMsgQueue.Count;
end;
procedure TWinMsgServerComm.HandlePostedMessage(const Msg: TMsg); inline;
begin
if Msg.message <> WM_USER then
begin
TranslateMessage(Msg);
DispatchMessage(Msg);
end
end;
procedure TWinMsgServerComm.ProcessMessages;
var
Msg: TMsg;
begin
// Windows.PeekMessage dispatches incoming sent messages by directly
// calling associated WindowProc, and then checks the thread message queue
// for posted messages and retrieves a message if any available.
// Note: WM_COPYDATA is a sent message, not posted, so it will be processed
// directly via WindowProc inside of Windows.PeekMessage call.
while Windows.PeekMessage(Msg, FHWND, 0, 0, PM_REMOVE) do
begin
// Empty the message queue by processing all posted messages.
HandlePostedMessage(Msg);
end;
end;
procedure TWinMsgServerComm.ProcessMessagesWait(TimeOut: Integer);
var
Msg: TMsg;
TimerID: UINT_PTR;
GetMessageReturn: BOOL;
begin
// Not allowed to wait.
if TimeOut = 0 then
Exit;
// Setup a timer to post WM_TIMER to wake up GetMessage call.
if TimeOut > 0 then
TimerID := SetTimer(FHWND, 0, TimeOut, nil)
else
TimerID := 0;
// Wait until a message arrives.
try
// We either need to wait infinitely or we have a timer.
if (TimeOut < 0) or (TimerID <> 0) then
begin
// Windows.GetMessage dispatches incoming sent messages until a posted
// message is available for retrieval. Note: WM_COPYDATA will not actually
// wake up Windows.GetMessage, so we must post a dummy message when
// we receive WM_COPYDATA inside of WindowProc.
GetMessageReturn := GetMessage(Msg, FHWND, 0, 0);
case LongInt(GetMessageReturn) of
-1, 0: ;
else HandlePostedMessage(Msg);
end;
end;
finally
// Destroy timer.
if TimerID <> 0 then
KillTimer(FHWND, TimerID);
end;
end;
function TWinMsgServerComm.PeekMessage(TimeOut: Integer): Boolean;
Var
Msg : Tmsg;
B : Boolean;
R : DWORD;
begin
Result:=FDataPushed;
If Result then
Exit;
B:=Windows.PeekMessage(Msg, FHWND, 0, 0, PM_NOREMOVE);
If not B then
// No message yet. Wait for a message to arrive available within specified time.
begin
if (TimeOut=0) then
TimeOut:=Integer(INFINITE);
R:=MsgWaitForMultipleObjects(1,FHWND,False,TimeOut,QS_SENDMESSAGE);
B:=(R<>WAIT_TIMEOUT);
end;
If B then
Repeat
B:=Windows.PeekMessage(Msg, FHWND, 0, 0, PM_NOREMOVE);
if B then
begin
Result:=(Msg.Message=WM_COPYDATA);
// Remove non WM_COPY messages from Queue
if not Result then
GetMessage(Msg,FHWND,0,0);
end;
Until Result or (not B);
// Process incoming messages.
ProcessMessages;
// Do we have queued messages?
Result := HaveQueuedMessages;
// Wait for incoming messages.
if (not Result) and (TimeOut <> 0) then
begin
ProcessMessagesWait(TimeOut);
Result := HaveQueuedMessages;
end;
// Check for exception raised inside WindowProc.
CheckWndProcException;
end;
procedure TWinMsgServerComm.ReadMsgData(var Msg: TMsg);
Var
CDS : PCopyDataStruct;
var
CDS: PCopyDataStruct;
MsgItem: TWinMsgServerMsg;
begin
CDS:=PCopyDataStruct(Msg.Lparam);
Owner.FMsgType:=CDS^.dwData;
Owner.FMsgData.Size:=0;
Owner.FMsgData.Seek(0,soFrombeginning);
Owner.FMsgData.WriteBuffer(CDS^.lpData^,CDS^.cbData);
CDS := PCopyDataStruct(Msg.lParam);
MsgItem := TWinMsgServerMsg.Create;
try
MsgItem.MsgType := CDS^.dwData;
MsgItem.Stream.WriteBuffer(CDS^.lpData^,CDS^.cbData);
except
FreeAndNil(MsgItem);
// Caller is expected to catch this exception, so not using Owner.DoError()
raise;
end;
FMsgQueue.Push(MsgItem);
end;
function TWinMsgServerComm.TryReadMsgData(var Msg: TMsg; out Error: String): Boolean;
begin
Result := True;
try
ReadMsgData(Msg);
except on E: Exception do
begin
Result := False;
Error := E.Message;
end;
end;
end;
procedure TWinMsgServerComm.ReadMessage;
Var
Msg : TMsg;
var
MsgItem: TWinMsgServerMsg;
begin
If FDataPushed then
FDataPushed:=False
else
If Windows.PeekMessage(Msg, FHWND, 0, 0, PM_REMOVE) then
if (Msg.Message=WM_COPYDATA) then
ReadMsgData(Msg);
MsgItem := FMsgQueue.Pop;
if Assigned(MsgItem) then
try
// Load message from the queue into the owner's message data.
MsgItem.Stream.Position := 0;
Owner.FMsgData.Size := 0;
Owner.FMsgType := MsgItem.MsgType;
Owner.FMsgData.CopyFrom(MsgItem.Stream, MsgItem.Stream.Size);
finally
// We are responsible for freeing the message from the queue.
MsgItem.Free;
end;
end;
function TWinMsgServerComm.GetInstanceID: String;
@ -201,7 +456,8 @@ Type
TWinMsgClientComm = Class(TIPCClientComm)
Private
FWindowName: String;
FHWND : HWnd;
FHWND : HWND;
function FindServerWindow: HWND;
Public
Constructor Create(AOWner : TSimpleIPCClient); override;
Procedure Connect; override;
@ -220,9 +476,14 @@ begin
FWindowName:=FWindowName+'_'+Owner.ServerInstance;
end;
function TWinMsgClientComm.FindServerWindow: HWND;
begin
Result := FindWindowA(MsgWndClassName,PChar(FWindowName));
end;
procedure TWinMsgClientComm.Connect;
begin
FHWND:=FindWindowA(MsgWndClassName,PChar(FWindowName));
FHWND:=FindServerWindow;
If (FHWND=0) then
Owner.DoError(SErrServerNotActive,[Owner.ServerID]);
end;
@ -232,41 +493,39 @@ begin
FHWND:=0;
end;
procedure TWinMsgClientComm.SendMessage(MsgType: TMessageType; Stream: TStream
);
Var
procedure TWinMsgClientComm.SendMessage(MsgType: TMessageType; Stream: TStream);
var
CDS : TCopyDataStruct;
Data,FMemstr : TMemorySTream;
begin
If Stream is TMemoryStream then
begin
if Stream is TMemoryStream then
begin
Data:=TMemoryStream(Stream);
FMemStr:=Nil
end
FMemStr:=nil;
end
else
begin
begin
FMemStr:=TMemoryStream.Create;
Data:=FMemstr;
end;
Try
If Assigned(FMemStr) then
begin
end;
try
if Assigned(FMemStr) then
begin
FMemStr.CopyFrom(Stream,0);
FMemStr.Seek(0,soFromBeginning);
end;
end;
CDS.dwData:=MsgType;
CDS.lpData:=Data.Memory;
CDS.cbData:=Data.Size;
Windows.SendMessage(FHWnd,WM_COPYDATA,0,PtrInt(@CDS));
Finally
Windows.SendMessage(FHWND,WM_COPYDATA,0,PtrInt(@CDS));
finally
FreeAndNil(FMemStr);
end;
end;
function TWinMsgClientComm.ServerRunning: Boolean;
begin
Result:=FindWindowA(MsgWndClassName,PChar(FWindowName))<>0;
Result:=FindServerWindow<>0;
end;
{ ---------------------------------------------------------------------