diff --git a/.gitattributes b/.gitattributes index bce6a185f2..bd0643ac0e 100644 --- a/.gitattributes +++ b/.gitattributes @@ -2676,6 +2676,8 @@ packages/fcl-process/examples/ipcserver.lpi svneol=native#text/plain packages/fcl-process/examples/ipcserver.pp svneol=native#text/plain packages/fcl-process/examples/simpleipcserver.lpi svneol=native#text/plain packages/fcl-process/examples/simpleipcserver.lpr svneol=native#text/plain +packages/fcl-process/examples/threadedipc.lpi svneol=native#text/plain +packages/fcl-process/examples/threadedipc.lpr svneol=native#text/plain packages/fcl-process/examples/waitonexit.pp svneol=native#text/pascal packages/fcl-process/fpmake.pp svneol=native#text/plain packages/fcl-process/src/amicommon/pipes.inc svneol=native#text/plain diff --git a/packages/fcl-process/examples/threadedipc.lpi b/packages/fcl-process/examples/threadedipc.lpi new file mode 100644 index 0000000000..42dd6a5de2 --- /dev/null +++ b/packages/fcl-process/examples/threadedipc.lpi @@ -0,0 +1,67 @@ +<?xml version="1.0" encoding="UTF-8"?> +<CONFIG> + <ProjectOptions> + <Version Value="10"/> + <PathDelim Value="\"/> + <General> + <Flags> + <MainUnitHasCreateFormStatements Value="False"/> + <MainUnitHasTitleStatement Value="False"/> + </Flags> + <SessionStorage Value="InProjectDir"/> + <MainUnit Value="0"/> + <Title Value="threadedipc"/> + <UseAppBundle Value="False"/> + <ResourceType Value="res"/> + </General> + <VersionInfo> + <StringTable ProductVersion=""/> + </VersionInfo> + <BuildModes Count="1"> + <Item1 Name="Default" Default="True"/> + </BuildModes> + <PublishOptions> + <Version Value="2"/> + </PublishOptions> + <RunParams> + <local> + <FormatVersion Value="1"/> + </local> + </RunParams> + <Units Count="1"> + <Unit0> + <Filename Value="threadedipc.lpr"/> + <IsPartOfProject Value="True"/> + </Unit0> + </Units> + </ProjectOptions> + <CompilerOptions> + <Version Value="11"/> + <PathDelim Value="\"/> + <Target> + <Filename Value="threadedipc"/> + </Target> + <SearchPaths> + <IncludeFiles Value="$(ProjOutDir)"/> + <UnitOutputDirectory Value="lib\$(TargetCPU)-$(TargetOS)"/> + </SearchPaths> + <Linking> + <Debugging> + <UseExternalDbgSyms Value="True"/> + </Debugging> + </Linking> + </CompilerOptions> + <Debugging> + <Exceptions Count="3"> + <Item1> + <Name Value="EAbort"/> + </Item1> + <Item2> + <Name Value="ECodetoolError"/> + </Item2> + <Item3> + <Name Value="EFOpenError"/> + </Item3> + </Exceptions> + </Debugging> +</CONFIG> diff --git a/packages/fcl-process/examples/threadedipc.lpr b/packages/fcl-process/examples/threadedipc.lpr new file mode 100644 index 0000000000..67f1b7411e --- /dev/null +++ b/packages/fcl-process/examples/threadedipc.lpr @@ -0,0 +1,111 @@ +program ThreadedIPC; + +{$mode objfpc}{$H+} + +uses + {$IFDEF UNIX}cthreads,{$ENDIF} + SysUtils, Classes, Math, FGL, SimpleIPC; + +const + ServerUniqueID = '39693DC0-BD8B-4AAD-9D9B-387D37CD59FD'; + ServerTimeout = 5000; + ClientDelayMin = 500; + ClientDelayMax = 3000; + ClientCount = 10; + +var + ServerThreaded: Boolean = True; + +type + TServerMessageHandler = class + public + procedure HandleMessage(Sender: TObject); + procedure HandleMessageQueued(Sender: TObject); + end; + +procedure TServerMessageHandler.HandleMessage(Sender: TObject); +begin + WriteLn(TSimpleIPCServer(Sender).StringMessage); +end; + +procedure TServerMessageHandler.HandleMessageQueued(Sender: TObject); +begin + TSimpleIPCServer(Sender).ReadMessage; +end; + +procedure ServerWorker; +var + Server: TSimpleIPCServer; + MessageHandler: TServerMessageHandler; +begin + WriteLn(Format('Starting server #%x', [GetThreadID])); + MessageHandler := TServerMessageHandler.Create; + Server := TSimpleIPCServer.Create(nil); + try + Server.ServerID := ServerUniqueID; + Server.Global := True; + Server.OnMessage := @MessageHandler.HandleMessage; + Server.OnMessageQueued := @MessageHandler.HandleMessageQueued; + Server.StartServer(ServerThreaded); + if ServerThreaded then + Sleep(ServerTimeout) + else + while Server.PeekMessage(ServerTimeout, True) do ; + except on E: Exception do + WriteLn('Server error: ' + E.Message); + end; + Server.Free; + MessageHandler.Free; + WriteLn(Format('Finished server #%x', [GetThreadID])); +end; + +procedure ClientWorker; +var + Client: TSimpleIPCClient; + Message: String; +begin + WriteLn(Format('Starting client #%x', [GetThreadID])); + Client := TSimpleIPCClient.Create(nil); + try + Client.ServerID := ServerUniqueID; + while not Client.ServerRunning do + Sleep(100); + Client.Active := True; + Sleep(RandomRange(ClientDelayMin, ClientDelayMax)); + Message := Format('Hello from client #%x', [GetThreadID]); + Client.SendStringMessage(Message); + except on E: Exception do + WriteLn('Client error: ' + E.Message); + end; + Client.Free; + WriteLn(Format('Finished client #%x', [GetThreadID])); +end; + +type + TThreadList = specialize TFPGObjectList<TThread>; + +var + I: Integer; + Thread: TThread; + Threads: TThreadList; + +begin + Randomize; + WriteLn('Threaded server: ' + BoolToStr(ServerThreaded, 'YES', 'NO')); + Threads := TThreadList.Create(True); + try + Threads.Add(TThread.CreateAnonymousThread(@ServerWorker)); + for I := 1 to ClientCount do + Threads.Add(TThread.CreateAnonymousThread(@ClientWorker)); + for Thread in Threads do + begin + Thread.FreeOnTerminate := False; + Thread.Start; + end; + for Thread in Threads do + Thread.WaitFor; + finally + Threads.Free; + end; +end. + diff --git a/packages/fcl-process/src/amicommon/simpleipc.inc b/packages/fcl-process/src/amicommon/simpleipc.inc index 00104c152c..80f590d1e2 100644 --- a/packages/fcl-process/src/amicommon/simpleipc.inc +++ b/packages/fcl-process/src/amicommon/simpleipc.inc @@ -234,6 +234,7 @@ Procedure TAmigaServerComm.ReadMessage; var Temp: PByte; MsgType: TMessageType; + Msg: TIPCServerMsg; begin if Assigned(MsgBody) then begin @@ -241,11 +242,18 @@ begin Inc(Temp, SizeOf(Exec.TMessage)); MsgType := 0; Move(Temp^, MsgType, SizeOf(TMessageType)); - Inc(Temp, SizeOf(TMessageType)); - Owner.FMsgType := MsgType; - Owner.FMsgData.Size := 0; - Owner.FMsgData.Seek(0, soFrombeginning); - Owner.FMsgData.WriteBuffer(temp^, MsgBody^.mn_Length); + Inc(Temp, SizeOf(TMessageType)); + + Msg := TIPCServerMsg.Create; + try + Msg.MsgType := MsgType; + Msg.Stream.WriteBuffer(Temp^, MsgBody^.mn_Length); + except + FreeAndNil(Msg); + raise; + end; + PushMessage(Msg); + System.FreeMem(MsgBody); MsgBody := nil; end; diff --git a/packages/fcl-process/src/simpleipc.pp b/packages/fcl-process/src/simpleipc.pp index 9b5e34fd0d..ae10245dae 100644 --- a/packages/fcl-process/src/simpleipc.pp +++ b/packages/fcl-process/src/simpleipc.pp @@ -20,26 +20,20 @@ unit simpleipc; interface uses - Contnrs, Classes, SysUtils; + Contnrs, SyncObjs, Classes, SysUtils; -Const +const MsgVersion = 1; - DefaultThreadTimeOut = 50; - //Message types + { IPC message types } mtUnknown = 0; mtString = 1; type TIPCMessageOverflowAction = (ipcmoaNone, ipcmoaDiscardOld, ipcmoaDiscardNew, ipcmoaError); -var - DefaultIPCMessageOverflowAction: TIPCMessageOverflowAction = ipcmoaNone; - DefaultIPCMessageQueueLimit: Integer = 0; - -Type - TMessageType = LongInt; + TMsgHeader = Packed record Version : Byte; MsgType : TMessageType; @@ -49,17 +43,29 @@ Type TSimpleIPCServer = class; TSimpleIPCClient = class; + { TIPCServerMsg } TIPCServerMsg = class + private type + TStreamClass = class of TStream; + private const + // TMemoryStream uses an effecient grow algorithm. + DefaultStreamClass: TStreamClass = TMemoryStream; strict private FStream: TStream; + FOwnsStream: Boolean; FMsgType: TMessageType; + function GetStringMessage: String; public constructor Create; + constructor Create(AStream: TStream; AOwnsStream: Boolean = True); destructor Destroy; override; property Stream: TStream read FStream; property MsgType: TMessageType read FMsgType write FMsgType; + property OwnsStream: Boolean read FOwnsStream write FOwnsStream; + property StringMessage: String read GetStringMessage; end; + { TIPCServerMsgQueue } TIPCServerMsgQueue = class strict private FList: TFPObjectList; @@ -80,7 +86,6 @@ Type end; { TIPCServerComm } - TIPCServerComm = Class(TObject) Private FOwner : TSimpleIPCServer; @@ -94,10 +99,10 @@ Type Property Owner : TSimpleIPCServer read FOwner; Procedure StartServer; virtual; Abstract; Procedure StopServer;virtual; Abstract; - // May push messages on the queue - Function PeekMessage(TimeOut : Integer) : Boolean;virtual; Abstract; - // Must put message on the queue. - Procedure ReadMessage ;virtual; Abstract; + // Check for new messages, may read and push messages to the queue. + Function PeekMessage(Timeout: Integer): Boolean; virtual; Abstract; + // Read and push new message to the queue, if not done by PeekMessage. + Procedure ReadMessage; virtual; Abstract; Property InstanceID : String read GetInstanceID; end; TIPCServerCommClass = Class of TIPCServerComm; @@ -111,6 +116,7 @@ Type FBusy: Boolean; FActive : Boolean; FServerID : String; + procedure PrepareServerID; Procedure DoError(const Msg: String; const Args: array of const); Procedure CheckInactive; Procedure CheckActive; @@ -123,26 +129,44 @@ Type Property ServerID : String Read FServerID Write SetServerID; end; + TMessageQueueEvent = Procedure(Sender: TObject; Msg: TIPCServerMsg) of object; + { TSimpleIPCServer } - - TMessageQueueEvent = Procedure(Sender : TObject; Msg : TIPCServerMsg) of object; - TSimpleIPCServer = Class(TSimpleIPC) - protected - Private + private const + DefaultThreaded = False; + DefaultThreadTimeout = 50; + DefaultSynchronizeEvents = True; + DefaultMaxAction = ipcmoaNone; + DefaultMaxQueue = 0; + private FOnMessageError: TMessageQueueEvent; FOnMessageQueued: TNotifyEvent; - FQueue : TIPCServerMsgQueue; - FGlobal: Boolean; FOnMessage: TNotifyEvent; - FMsgType: TMessageType; - FMsgData : TStream; - FThreadTimeOut: Integer; - FThread : TThread; - FLock : TRTLCriticalSection; - FErrMsg : TIPCServerMsg; - procedure DoMessageQueued; - procedure DoMessageError; + FOnThreadError: TNotifyEvent; + FQueue: TIPCServerMsgQueue; + FQueueLock: TCriticalSection; + FQueueAddEvent: TSimpleEvent; + FGlobal: Boolean; + // Access to the message is not locked by design! + // In the threaded mode, it must be accessed only during event callbacks. + FMessage: TIPCServerMsg; + FTempMessage: TIPCServerMsg; + FThreaded: Boolean; + FThreadTimeout: Integer; + FThreadError: String; + FThreadExecuting: Boolean; + FThreadReadyEvent: TSimpleEvent; + FThread: TThread; + FSynchronizeEvents: Boolean; + procedure DoOnMessage; + procedure DoOnMessageQueued; + procedure DoOnMessageError(Msg: TIPCServerMsg); + procedure DoOnThreadError; + procedure InternalDoOnMessage; + procedure InternalDoOnMessageQueued; + procedure InternalDoOnMessageError; + procedure InternalDoOnThreadError; function GetInstanceID: String; function GetMaxAction: TIPCMessageOverflowAction; function GetMaxQueue: Integer; @@ -150,31 +174,43 @@ Type procedure SetGlobal(const AValue: Boolean); procedure SetMaxAction(AValue: TIPCMessageOverflowAction); procedure SetMaxQueue(AValue: Integer); - Protected + procedure SetThreaded(AValue: Boolean); + procedure SetThreadTimeout(AValue: Integer); + procedure SetSynchronizeEvents(AValue: Boolean); + function WaitForReady(Timeout: Integer = -1): Boolean; + function GetMsgType: TMessageType; + function GetMsgData: TStream; + protected FIPCComm: TIPCServerComm; - procedure StartThread; virtual; - procedure StopThread; virtual; Function CommClass : TIPCServerCommClass; virtual; Procedure PushMessage(Msg : TIPCServerMsg); virtual; function PopMessage: Boolean; virtual; + procedure StartComm; virtual; + procedure StopComm; virtual; + function StartThread: Boolean; virtual; + procedure StopThread; virtual; Procedure Activate; override; Procedure Deactivate; override; + function ProcessMessage(Timeout: Integer): Boolean; Property Queue : TIPCServerMsgQueue Read FQueue; Property Thread : TThread Read FThread; Public Constructor Create(AOwner : TComponent); override; Destructor Destroy; override; - Procedure StartServer(Threaded : Boolean = False); + Procedure StartServer; + Procedure StartServer(AThreaded: Boolean); Procedure StopServer; - Function PeekMessage(TimeOut : Integer; DoReadMessage : Boolean): Boolean; - Procedure ReadMessage; + Function PeekMessage(Timeout: Integer; DoReadMessage: Boolean): Boolean; + Function ReadMessage: Boolean; Property StringMessage : String Read GetStringMessage; Procedure GetMessageData(Stream : TStream); - Property MsgType: TMessageType Read FMsgType; - Property MsgData : TStream Read FMsgData; + Property Message: TIPCServerMsg read FMessage; + Property MsgType: TMessageType Read GetMsgType; + Property MsgData: TStream Read GetMsgData; Property InstanceID : String Read GetInstanceID; + property ThreadExecuting: Boolean read FThreadExecuting; + property ThreadError: String read FThreadError; Published - Property ThreadTimeOut : Integer Read FThreadTimeOut Write FThreadTimeOut; Property Global : Boolean Read FGlobal Write SetGlobal; // Called during ReadMessage Property OnMessage : TNotifyEvent Read FOnMessage Write FOnMessage; @@ -182,14 +218,21 @@ Type Property OnMessageQueued : TNotifyEvent Read FOnMessageQueued Write FOnMessageQueued; // Called when the queue overflows and MaxAction = ipcmoaError. Property OnMessageError : TMessageQueueEvent Read FOnMessageError Write FOnMessageError; + // Called when the server thread catches an exception. + property OnThreadError: TNotifyEvent read FOnThreadError write FOnThreadError; // Maximum number of messages to keep in the queue - property MaxQueue: Integer read GetMaxQueue write SetMaxQueue; + property MaxQueue: Integer read GetMaxQueue write SetMaxQueue default DefaultMaxQueue; // What to do when the queue overflows - property MaxAction: TIPCMessageOverflowAction read GetMaxAction write SetMaxAction; + property MaxAction: TIPCMessageOverflowAction read GetMaxAction write SetMaxAction default DefaultMaxAction; + // Instruct IPC server to operate in a threaded mode. + property Threaded: Boolean read FThreaded write SetThreaded; + // Amount of time thread waits for a message before checking for termination. + property ThreadTimeout: Integer read FThreadTimeout write SetThreadTimeout default DefaultThreadTimeout; + // Synchronize events with the main thread when in threaded mode. + property SynchronizeEvents: Boolean read FSynchronizeEvents write SetSynchronizeEvents default DefaultSynchronizeEvents; end; - - { TIPCClientComm} + { TIPCClientComm } TIPCClientComm = Class(TObject) private FOwner: TSimpleIPCClient; @@ -229,17 +272,23 @@ Type Property ServerInstance : String Read FServerInstance Write SetServerInstance; end; - EIPCError = Class(Exception); -Var +var DefaultIPCServerClass : TIPCServerCommClass = Nil; DefaultIPCClientClass : TIPCClientCommClass = Nil; +var + DefaultIPCMessageOverflowAction: TIPCMessageOverflowAction = TSimpleIPCServer.DefaultMaxAction; + DefaultIPCMessageQueueLimit: Integer = TSimpleIPCServer.DefaultMaxQueue; + resourcestring SErrServerNotActive = 'Server with ID %s is not active.'; SErrActive = 'This operation is illegal when the server is active.'; SErrInActive = 'This operation is illegal when the server is inactive.'; + SErrThreadContext = 'This operation is illegal outside of IPC thread context.'; + SErrThreadFailure = 'IPC thread failure.'; + SErrMessageQueueOverflow = 'Message queue overflow (limit %s)'; implementation @@ -252,34 +301,107 @@ implementation This comes first, to allow the uses clause to be set. If the include file defines OSNEEDIPCINITDONE then the unit will call IPCInit and IPCDone in the initialization/finalization code. - --------------------------------------------------------------------- } {$UNDEF OSNEEDIPCINITDONE} {$i simpleipc.inc} -Resourcestring - SErrMessageQueueOverflow = 'Message queue overflow (limit %s)'; +// Convert content of any stream type to a string. +function FastStreamToString(Stream: TStream): String; +var + CharCount, CharSize: Integer; + StringStream: TStringStream; + OldPosition: Int64; +begin + // Optimized for TStringStream + if Stream is TStringStream then + begin + Result := TStringStream(Stream).DataString; + end + // Optimized for TCustomMemoryStream + else if Stream is TCustomMemoryStream then + begin + Result := ''; + CharSize := StringElementSize(Result); + CharCount := Stream.Size div CharSize; + SetLength(Result, CharCount); + Move(TCustomMemoryStream(Stream).Memory^, Result[1], CharCount * CharSize); + end + // Any other stream type + else + begin + OldPosition := Stream.Position; + try + StringStream := TStringStream.Create(''); + try + Stream.Position := 0; + StringStream.CopyFrom(Stream, Stream.Size); + Result := StringStream.DataString; + finally + StringStream.Free; + end; + finally + Stream.Position := OldPosition; + end; + end; +end; -{ --------------------------------------------------------------------- - TIPCServerMsg - ---------------------------------------------------------------------} +// Timeout values: +// > 0 -- Number of milliseconds to wait +// = 0 -- return immediately +// = -1 -- wait infinitely (converted to INFINITE) +// < -1 -- wait infinitely (converted to INFINITE) +function IPCTimeoutToEventTimeout(Timeout: Integer): Cardinal; inline; +begin + if Timeout >= 0 then + Result := Timeout + else + Result := SyncObjs.INFINITE; +end; +// Timeout values: +// > 0 -- Number of milliseconds to wait +// = 0 -- return immediately +// = -1 -- wait infinitely +// < -1 -- wait infinitely (force to -1) +function IPCTimeoutSanitized(Timeout: Integer): Integer; inline; +begin + if Timeout >= 0 then + Result := Timeout + else + Result := -1; +end; + +{$REGION 'TIPCServerMsg'} constructor TIPCServerMsg.Create; begin - FMsgType := 0; - FStream := TMemoryStream.Create; + FMsgType := mtUnknown; + FStream := Self.DefaultStreamClass.Create; + FOwnsStream := True; +end; + +constructor TIPCServerMsg.Create(AStream: TStream; AOwnsStream: Boolean); +begin + FMsgType := mtUnknown; + FStream := AStream; + FOwnsStream := AOwnsStream; end; destructor TIPCServerMsg.Destroy; begin - FStream.Free; + if FOwnsStream then + FreeAndNil(FStream); end; -{ --------------------------------------------------------------------- - TIPCServerMsgQueue - ---------------------------------------------------------------------} +function TIPCServerMsg.GetStringMessage: String; +begin + Result := FastStreamToString(FStream); +end; + +{$ENDREGION} + +{$REGION 'TIPCServerMsgQueue'} constructor TIPCServerMsgQueue.Create; begin @@ -336,6 +458,7 @@ end; procedure TIPCServerMsgQueue.Push(AItem: TIPCServerMsg); begin + // PrepareToPush may throw an exception, e.g. if message queue is full. if PrepareToPush then FList.Insert(0, AItem); end; @@ -355,10 +478,9 @@ begin Result := nil; end; +{$ENDREGION} -{ --------------------------------------------------------------------- - TIPCServerComm - ---------------------------------------------------------------------} +{$REGION 'TIPCServerComm'} constructor TIPCServerComm.Create(AOwner: TSimpleIPCServer); begin @@ -366,16 +488,13 @@ begin end; procedure TIPCServerComm.DoError(const Msg: String; const Args: array of const); - begin FOwner.DoError(Msg,Args); end; procedure TIPCServerComm.PushMessage(const Hdr: TMsgHeader; AStream: TStream); - -Var +var M : TIPCServerMsg; - begin M:=TIPCServerMsg.Create; try @@ -394,9 +513,9 @@ begin FOwner.PushMessage(Msg); end; -{ --------------------------------------------------------------------- - TIPCClientComm - ---------------------------------------------------------------------} +{$ENDREGION} + +{$REGION 'TIPCClientComm'} constructor TIPCClientComm.Create(AOwner: TSimpleIPCClient); begin @@ -409,9 +528,9 @@ begin FOwner.DoError(Msg,Args); end; -{ --------------------------------------------------------------------- - TSimpleIPC - ---------------------------------------------------------------------} +{$ENDREGION} + +{$REGION 'TSimpleIPC'} Procedure TSimpleIPC.DoError(const Msg: String; const Args: array of const); var @@ -441,7 +560,7 @@ end; procedure TSimpleIPC.SetActive(const AValue: Boolean); begin if (FActive<>AValue) then - begin + begin if ([]<>([csLoading,csDesigning]*ComponentState)) then FActive:=AValue else @@ -449,36 +568,89 @@ begin Activate else Deactivate; - end; + end; end; procedure TSimpleIPC.SetServerID(const AValue: String); begin if (FServerID<>AValue) then - begin + begin CheckInactive; - FServerID:=AValue - end; + FServerID:=AValue; + end; end; -Procedure TSimpleIPC.Loaded; - -Var - B : Boolean; - +procedure TSimpleIPC.PrepareServerID; begin - Inherited; + if FServerID = '' then + FServerID := ApplicationName; + // Extra precaution for thread-safety + UniqueString(FServerID); +end; + +procedure TSimpleIPC.Loaded; +var + B : Boolean; +begin + inherited; B:=FActive; if B then - begin - Factive:=False; + begin + FActive:=False; Activate; - end; + end; end; -{ --------------------------------------------------------------------- - TSimpleIPCServer - ---------------------------------------------------------------------} +{$ENDREGION} + +{$REGION 'TIPCServerThread'} + +type + TIPCServerThread = class(TThread) + private + FServer: TSimpleIPCServer; + protected + procedure Execute; override; + public + constructor Create(AServer: TSimpleIPCServer); + property Server: TSimpleIPCServer read FServer; + end; + +constructor TIPCServerThread.Create(AServer: TSimpleIPCServer); +begin + inherited Create(True); // CreateSuspended = True + FServer := AServer; +end; + +procedure TIPCServerThread.Execute; +begin + FServer.FThreadExecuting := True; + try + FServer.StartComm; + try + // Notify server that thread has started. + FServer.FThreadReadyEvent.SetEvent; + // Run message loop + while not Terminated do + FServer.ProcessMessage(FServer.ThreadTimeout); + finally + FServer.StopComm; + end; + except on E: Exception do + begin + FServer.FThreadExecuting := False; + FServer.FThreadError := E.Message; + // Trigger event to wake up the caller from potentially indefinite wait. + FServer.FThreadReadyEvent.SetEvent; + FServer.DoOnThreadError; + end; + end; + FServer.FThreadExecuting := False; +end; + +{$ENDREGION} + +{$REGION 'TSimpleIPCServer'} constructor TSimpleIPCServer.Create(AOwner: TComponent); begin @@ -486,35 +658,55 @@ begin FGlobal:=False; FActive:=False; FBusy:=False; - FMsgData:=TStringStream.Create(''); + FMessage:=nil; FQueue:=TIPCServerMsgQueue.Create; - FThreadTimeOut:=DefaultThreadTimeOut; + FThreaded:=DefaultThreaded; + FThreadTimeout:=DefaultThreadTimeout; + FSynchronizeEvents:=DefaultSynchronizeEvents; end; destructor TSimpleIPCServer.Destroy; begin Active:=False; FreeAndNil(FQueue); - FreeAndNil(FMsgData); + if Assigned(FMessage) then + FreeAndNil(FMessage); inherited Destroy; end; procedure TSimpleIPCServer.SetGlobal(const AValue: Boolean); begin - if (FGlobal<>AValue) then - begin - CheckInactive; - FGlobal:=AValue; - end; + CheckInactive; + FGlobal:=AValue; +end; + +procedure TSimpleIPCServer.SetThreaded(AValue: Boolean); +begin + CheckInactive; + FThreaded:=AValue; +end; + +procedure TSimpleIPCServer.SetThreadTimeout(AValue: Integer); +begin + CheckInactive; + FThreadTimeout:=AValue; +end; + +procedure TSimpleIPCServer.SetSynchronizeEvents(AValue: Boolean); +begin + CheckInactive; + FSynchronizeEvents:=AValue; end; procedure TSimpleIPCServer.SetMaxAction(AValue: TIPCMessageOverflowAction); begin + CheckInactive; FQueue.MaxAction:=AValue; end; procedure TSimpleIPCServer.SetMaxQueue(AValue: Integer); begin + CheckInactive; FQueue.MaxCount:=AValue; end; @@ -533,161 +725,229 @@ begin Result:=FQueue.MaxCount; end; - -function TSimpleIPCServer.GetStringMessage: String; +procedure TSimpleIPCServer.StartComm; begin - Result:=TStringStream(FMsgData).DataString; + if Assigned(FIPCComm) then + FreeAndNil(FIPCComm); + FIPCComm := CommClass.Create(Self); + FIPCComm.StartServer; end; - -procedure TSimpleIPCServer.StartServer(Threaded : Boolean = False); +procedure TSimpleIPCServer.StopComm; begin - if Not Assigned(FIPCComm) then - begin - If (FServerID='') then - FServerID:=ApplicationName; - FIPCComm:=CommClass.Create(Self); - FIPCComm.StartServer; - end; - FActive:=True; - If Threaded then - StartThread; -end; - -Type - - { TServerThread } - - TServerThread = Class(TThread) - private - FServer: TSimpleIPCServer; - FThreadTimeout: Integer; - Public - Constructor Create(AServer : TSimpleIPCServer; ATimeout : integer); - procedure Execute; override; - Property Server : TSimpleIPCServer Read FServer; - Property ThreadTimeout : Integer Read FThreadTimeout; + if Assigned(FIPCComm) then + begin + FIPCComm.StopServer; + FreeAndNil(FIPCComm); end; - -{ TServerThread } - -constructor TServerThread.Create(AServer: TSimpleIPCServer; ATimeout: integer); -begin - FServer:=AServer; - FThreadTimeout:=ATimeOut; - Inherited Create(False); end; -procedure TServerThread.Execute; +function TSimpleIPCServer.StartThread: Boolean; begin - While Not Terminated do - FServer.PeekMessage(ThreadTimeout,False); -end; - -procedure TSimpleIPCServer.StartThread; - -begin - InitCriticalSection(FLock); - FThread:=TServerThread.Create(Self,ThreadTimeOut); + FThreadError := ''; + FQueueLock := SyncObjs.TCriticalSection.Create; + FQueueAddEvent := SyncObjs.TSimpleEvent.Create; + FThreadReadyEvent := SyncObjs.TSimpleEvent.Create; + FThread := TIPCServerThread.Create(Self); + FThread.Start; + Result := WaitForReady; end; procedure TSimpleIPCServer.StopThread; - begin if Assigned(FThread) then - begin + begin FThread.Terminate; FThread.WaitFor; FreeAndNil(FThread); - DoneCriticalSection(FLock); - end; + end; + if Assigned(FThreadReadyEvent) then + FreeAndNil(FThreadReadyEvent); + if Assigned(FQueueAddEvent) then + FreeAndNil(FQueueAddEvent); + if Assigned(FQueueLock) then + FreeAndNil(FQueueLock); +end; + +function TSimpleIPCServer.WaitForReady(Timeout: Integer = -1): Boolean; +begin + if FThreadReadyEvent.WaitFor(IPCTimeoutToEventTimeout(Timeout)) = wrSignaled then + Result := FThreadExecuting + else + Result := False; +end; + +procedure TSimpleIPCServer.StartServer; +begin + StartServer(FThreaded); +end; + +procedure TSimpleIPCServer.StartServer(AThreaded: Boolean); +begin + CheckInactive; + FActive := True; + try + PrepareServerID; + FThreaded := AThreaded; + if FThreaded then + begin + if not StartThread then + raise EIPCError.Create(SErrThreadFailure); + end + else + StartComm; + except + FActive := False; + raise; + end; end; procedure TSimpleIPCServer.StopServer; begin StopThread; - If Assigned(FIPCComm) then - begin - FIPCComm.StopServer; - FreeAndNil(FIPCComm); - end; + StopComm; FQueue.Clear; - FActive:=False; + FActive := False; end; -// TimeOut values: +function TSimpleIPCServer.ProcessMessage(Timeout: Integer): Boolean; +begin + FBusy := True; + try + // Check for new messages (may push several messages to the queue) + Result := FIPCComm.PeekMessage(IPCTimeoutSanitized(Timeout)); + // Push new message to the queue (explicitly) + if Result then + FIPCComm.ReadMessage; + finally + FBusy := False; + end; +end; + +// Timeout values: // > 0 -- Number of milliseconds to wait // = 0 -- return immediately // = -1 -- wait infinitely // < -1 -- wait infinitely (force to -1) -function TSimpleIPCServer.PeekMessage(TimeOut: Integer; DoReadMessage: Boolean): Boolean; +function TSimpleIPCServer.PeekMessage(Timeout: Integer; DoReadMessage: Boolean): Boolean; begin CheckActive; - Result:=Queue.Count>0; - If Not Result then - begin - if TimeOut < -1 then - TimeOut := -1; - FBusy:=True; - Try - Result:=FIPCComm.PeekMessage(Timeout); - Finally - FBusy:=False; - end; + + if Threaded then + begin + // Check if have messages in the queue + FQueueLock.Acquire; + try + Result:=FQueue.Count>0; + // Reset queue add event + if not Result then + FQueueAddEvent.ResetEvent; + finally + FQueueLock.Release; end; + // Wait for queue add event + if not Result and (Timeout <> 0) then + Result := FQueueAddEvent.WaitFor(IPCTimeoutToEventTimeout(Timeout)) = wrSignaled; + end + else + begin + // Check if have messages in the queue + Result:=FQueue.Count>0; + // If queue is empty, process new messages via IPC driver + if not Result then + Result := ProcessMessage(Timeout); + end; + + // Read message if available (be aware of a race condition in threaded mode) If Result then If DoReadMessage then - Readmessage; + ReadMessage; +end; + +function TSimpleIPCServer.ReadMessage: Boolean; +begin + // Pop a message from the queue + Result := PopMessage; + if Result then + DoOnMessage; end; function TSimpleIPCServer.PopMessage: Boolean; - -var - MsgItem: TIPCServerMsg; - DoLock : Boolean; - begin - DoLock:=Assigned(FThread); - if DoLock then - EnterCriticalsection(Flock); + if Threaded then + FQueueLock.Acquire; try - MsgItem:=FQueue.Pop; + if Assigned(FMessage) then + FreeAndNil(FMessage); + FMessage := FQueue.Pop; + Result := Assigned(FMessage); finally - if DoLock then - LeaveCriticalsection(FLock); + if Threaded then + FQueueLock.Release; end; - Result:=Assigned(MsgItem); - if Result then - try - FMsgType := MsgItem.MsgType; - MsgItem.Stream.Position := 0; - FMsgData.Size := 0; - FMsgData.CopyFrom(MsgItem.Stream, MsgItem.Stream.Size); - finally - MsgItem.Free; - end; end; -procedure TSimpleIPCServer.ReadMessage; - +procedure TSimpleIPCServer.PushMessage(Msg: TIPCServerMsg); +var + PushFailed: Boolean; begin - CheckActive; - FBusy:=True; - Try - if (FQueue.Count=0) then - // Readmessage pushes a message to the queue - FIPCComm.ReadMessage; - if PopMessage then - If Assigned(FOnMessage) then - FOnMessage(Self); - Finally - FBusy:=False; + if Threaded then + FQueueLock.Acquire; + try + PushFailed := False; + try + // Queue.Push may throw an exception, e.g. if message queue is full. + FQueue.Push(Msg); + except + PushFailed := True; + end; + // Notify a waiting PeekMessage in threaded mode + if Threaded and not PushFailed then + FQueueAddEvent.SetEvent; + finally + if Threaded then + FQueueLock.Release; end; + + if PushFailed then + // Handler must free the Msg, because it is not owned by anybody. + DoOnMessageError(Msg) + else + DoOnMessageQueued; +end; + +function TSimpleIPCServer.GetMsgType: TMessageType; +begin + // Access to the message is not locked by design! + if Assigned(FMessage) then + Result := FMessage.MsgType + else + Result := mtUnknown; +end; + +function TSimpleIPCServer.GetMsgData: TStream; +begin + // Access to the message is not locked by design! + if Assigned(FMessage) then + Result := FMessage.Stream + else + Result := nil; end; procedure TSimpleIPCServer.GetMessageData(Stream: TStream); begin - Stream.CopyFrom(FMsgData,0); + // Access to the message is not locked by design! + if Assigned(FMessage) then + Stream.CopyFrom(FMessage.Stream, 0); +end; + +function TSimpleIPCServer.GetStringMessage: String; +begin + // Access to the message is not locked by design! + if Assigned(FMessage) then + Result := FMessage.StringMessage + else + Result := ''; end; procedure TSimpleIPCServer.Activate; @@ -700,61 +960,86 @@ begin StopServer; end; +procedure TSimpleIPCServer.DoOnMessage; +begin + if Assigned(FOnMessage) then + begin + if FSynchronizeEvents and Assigned(FThread) then + TThread.Synchronize(FThread, @InternalDoOnMessage) + else + InternalDoOnMessage; + end; +end; -procedure TSimpleIPCServer.DoMessageQueued; +procedure TSimpleIPCServer.InternalDoOnMessage; +begin + if Assigned(FOnMessage) then + FOnMessage(Self); +end; +procedure TSimpleIPCServer.DoOnMessageQueued; +begin + if Assigned(FOnMessageQueued) then + begin + if FSynchronizeEvents and Assigned(FThread) then + TThread.Synchronize(FThread, @InternalDoOnMessageQueued) + else + InternalDoOnMessageQueued; + end; +end; + +procedure TSimpleIPCServer.InternalDoOnMessageQueued; begin if Assigned(FOnMessageQueued) then FOnMessageQueued(Self); end; -procedure TSimpleIPCServer.DoMessageError; +procedure TSimpleIPCServer.DoOnMessageError(Msg: TIPCServerMsg); begin try - if Assigned(FOnMessageQueued) then - FOnMessageError(Self,FErrMsg); - finally - FreeAndNil(FErrMsg) - end; -end; - -procedure TSimpleIPCServer.PushMessage(Msg: TIPCServerMsg); - -Var - DoLock : Boolean; - -begin - try - DoLock:=Assigned(FThread); - If DoLock then - EnterCriticalsection(FLock); - try - Queue.Push(Msg); - finally - If DoLock then - LeaveCriticalsection(FLock); + if Assigned(FOnMessageError) then + begin + // Temp message (class instance variable) is used to pass + // a parameter to a synchronized thread method. + FTempMessage := Msg; + if FSynchronizeEvents and Assigned(FThread) then + TThread.Synchronize(FThread, @InternalDoOnMessageError) + else + InternalDoOnMessageError; end; - if DoLock then - TThread.Synchronize(FThread,@DoMessageQueued) - else - DoMessageQueued; - except - On E : Exception do - FErrMsg:=Msg; + finally + // Must free the message because it is not owned by anybody. + FTempMessage := nil; + FreeAndNil(Msg); end; - if Assigned(FErrMsg) then - if DoLock then - TThread.Synchronize(FThread,@DoMessageError) - else - DoMessageQueued; - end; +procedure TSimpleIPCServer.InternalDoOnMessageError; +begin + if Assigned(FOnMessageError) then + FOnMessageError(Self, FTempMessage); +end; +procedure TSimpleIPCServer.DoOnThreadError; +begin + if Assigned(FOnThreadError) then + begin + if FSynchronizeEvents and Assigned(FThread) then + TThread.Synchronize(FThread, @InternalDoOnThreadError) + else + InternalDoOnThreadError; + end; +end; -{ --------------------------------------------------------------------- - TSimpleIPCClient - ---------------------------------------------------------------------} +procedure TSimpleIPCServer.InternalDoOnThreadError; +begin + if Assigned(FOnThreadError) then + FOnThreadError(Self); +end; + +{$ENDREGION} + +{$REGION 'TSimpleIPCClient'} procedure TSimpleIPCClient.SetServerInstance(const AValue: String); begin @@ -776,7 +1061,7 @@ begin inherited Create(AOwner); end; -destructor TSimpleIPCClient.destroy; +destructor TSimpleIPCClient.Destroy; begin Active:=False; Inherited; @@ -785,7 +1070,8 @@ end; procedure TSimpleIPCClient.Connect; begin If Not assigned(FIPCComm) then - begin + begin + PrepareServerID; FIPCComm:=CommClass.Create(Self); Try FIPCComm.Connect; @@ -794,7 +1080,7 @@ begin Raise; end; FActive:=True; - end; + end; end; procedure TSimpleIPCClient.Disconnect; @@ -809,21 +1095,24 @@ begin end; function TSimpleIPCClient.ServerRunning: Boolean; - +var + TempComm: TIPCClientComm; begin If Assigned(FIPCComm) then Result:=FIPCComm.ServerRunning else - With CommClass.Create(Self) do - Try - Result:=ServerRunning; - finally - Free; - end; + begin + PrepareServerID; + TempComm := CommClass.Create(Self); + Try + Result := TempComm.ServerRunning; + finally + TempComm.Free; + end; + end; end; procedure TSimpleIPCClient.SendMessage(MsgType : TMessageType; Stream: TStream); - begin CheckActive; FBusy:=True; @@ -839,8 +1128,7 @@ begin SendStringMessage(mtString,Msg); end; -procedure TSimpleIPCClient.SendStringMessage(MsgType: TMessageType; const Msg: String - ); +procedure TSimpleIPCClient.SendStringMessage(MsgType: TMessageType; const Msg: String); Var S : TStringStream; begin @@ -864,11 +1152,14 @@ begin SendStringMessage(MsgType, Format(Msg,Args)); end; +{$ENDREGION} + {$IFDEF OSNEEDIPCINITDONE} initialization IPCInit; finalization IPCDone; -{$ENDIF} +{$ENDIF} + end. diff --git a/packages/fcl-process/src/unix/simpleipc.inc b/packages/fcl-process/src/unix/simpleipc.inc index 85b471be65..22e5ff1402 100644 --- a/packages/fcl-process/src/unix/simpleipc.inc +++ b/packages/fcl-process/src/unix/simpleipc.inc @@ -131,8 +131,6 @@ Type Private FFileName: String; FStream: TFileStream; - Protected - Procedure DoReadMessage; virtual; Public Constructor Create(AOWner : TSimpleIPCServer); override; Procedure StartServer; override; @@ -144,15 +142,6 @@ Type Property Stream : TFileStream Read FStream; end; -procedure TPipeServerComm.DoReadMessage; - -Var - Hdr : TMsgHeader; - -begin - FStream.ReadBuffer(Hdr,SizeOf(Hdr)); - PushMessage(Hdr,FStream); -end; constructor TPipeServerComm.Create(AOWner: TSimpleIPCServer); begin @@ -187,25 +176,20 @@ begin end; function TPipeServerComm.PeekMessage(TimeOut: Integer): Boolean; - Var FDS : TFDSet; - begin fpfd_zero(FDS); fpfd_set(FStream.Handle,FDS); - Result:=False; - While fpSelect(FStream.Handle+1,@FDS,Nil,Nil,TimeOut)>0 do - begin - DoReadMessage; - Result:=True; - end; + Result := fpSelect(FStream.Handle+1,@FDS,Nil,Nil,TimeOut)>0; end; procedure TPipeServerComm.ReadMessage; - +Var + Hdr : TMsgHeader; begin - DoReadMessage; + FStream.ReadBuffer(Hdr,SizeOf(Hdr)); + PushMessage(Hdr,FStream); end;