{ Multi thread Queue,witch can be used without multi-thread (C) 2014 ti_dic@hotmail.com License: modified LGPL with linking exception (like RTL, FCL and LCL) See the file COPYING.modifiedLGPL.txt, included in the Lazarus distribution, for details about the license. See also: https://wiki.lazarus.freepascal.org/FPC_modified_LGPL } unit mvJobQueue; {$mode objfpc}{$H+} interface uses Classes, SysUtils,syncobjs,contnrs,forms; const ALL_TASK_COMPLETED = -1; NO_MORE_TASK = 0; type TJobQueue = class; { TJob } TJob = Class private FLauncher: TObject; FCancelled: Boolean; FName: String; protected Queue: TJobQueue; procedure DoCancel; virtual; Procedure WaitForResultOf(aJob: TJob); Procedure EnterCriticalSection; procedure LeaveCriticalSection; //should be called inside critical section function pGetTask: integer; virtual; procedure pTaskStarted(aTask: integer); virtual; abstract; procedure pTaskEnded(aTask: integer; aExcept: Exception); virtual; abstract; property Launcher: TObject read FLauncher; public procedure ExecuteTask(aTask: integer; FromWaiting: boolean); virtual; abstract; function Running: boolean; virtual; abstract; procedure Cancel; property Cancelled: boolean read FCancelled; property Name: String read FName write FName; end; TJobArray = Array of TJob; { TJobQueue } TJobQueue = class private FMainThreadId: TThreadID; FOnIdle: TNotifyEvent; waitings: TStringList; FNbThread: integer; TerminatedThread: integer; FSect: TCriticalSection; FEvent, TerminateEvent: TEvent; FUseThreads: boolean; Threads: TList; Jobs: TObjectList; procedure pJobCompleted(var aJob: TJob); procedure SetUseThreads(AValue: boolean); procedure ClearWaitings; protected Procedure InitThreads; Procedure FreeThreads; Procedure EnterCriticalSection; procedure LeaveCriticalSection; Procedure DoWaiting(E: Exception; TaskId: integer); //Should be called inside critical section procedure pAddWaiting(aJob: TJob; aTask: integer; JobId: String); procedure pTaskStarted(aJob: TJob; aTask: integer); procedure pTaskEnded(var aJob: TJob; aTask: integer; aExcept: Exception); function pGetJob(out TaskId: integer; out Restart: boolean) : TJob; function pFindJobByName(const aName: string; ByLauncher: TObject) : TJobArray; procedure pNotifyWaitings(aJob: TJob); Function IsMainThread: boolean; public constructor Create(NbThread: integer = 5); destructor Destroy; override; procedure QueueAsyncCall(const AMethod: TDataEvent; Data: PtrInt); procedure RemoveAsyncCalls(const AnObject: TObject); procedure QueueSyncCall(const AMethod: TDataEvent; Data: PtrInt); Procedure AddJob(aJob: TJob; Launcher: TObject); function AddUniqueJob(aJob: TJob; Launcher: TObject) : boolean; function CancelAllJob(ByLauncher: TObject): TJobArray; function CancelJobByName(aJobName: String; ByLauncher: TObject): boolean; Procedure WaitForTerminate(const lstJob: TJobArray); Procedure WaitAllJobTerminated(ByLauncher: TObject); property UseThreads: boolean read FUseThreads write SetUseThreads; property OnIdle: TNotifyEvent read FOnIdle write FOnIdle; end; implementation const WAIT_TIME = 150; TERMINATE_TIMEOUT = WAIT_TIME * 2; type { EWaiting } EWaiting = class(Exception) private FLauncher: TJob; FNewJob: TJob; public constructor Create(ALauncher: TJob; ANewJob: TJob); end; { TRestartTask } TRestartTask = class(TJob) private FStarted: Boolean; FJob: TJob; FTask: integer; protected procedure DoCancel; override; procedure pTaskStarted({%H-}aTask: integer); override; procedure pTaskEnded(aTask: integer; aExcept: Exception); override; function pGetTask: integer; override; public constructor Create(aJob: TJob; aTask: integer); procedure ExecuteTask(aTask: integer; FromWaiting: boolean); override; function Running: boolean; override; end; { TQueueThread } TQueueThread = class(TThread) private MyQueue: TJobQueue; function ProcessJob: boolean; public constructor Create(aQueue: TJobQueue); procedure Execute; override; end; { TSyncCallData } TSyncCallData = Class private FMethod: TDataEvent; FData: PtrInt; public constructor Create(AMethod: TDataEvent; AData: PtrInt); procedure SyncCall; end; { TSyncCallData } constructor TSyncCallData.Create(AMethod: TDataEvent; AData: PtrInt); begin FMethod := AMethod; FData := AData; end; procedure TSyncCallData.SyncCall; begin FMethod(FData); end; { TRestartTask } procedure TRestartTask.DoCancel; begin FJob.Cancel; end; procedure TRestartTask.pTaskStarted(aTask: integer); begin FStarted := true; end; procedure TRestartTask.pTaskEnded(aTask: integer; aExcept: Exception); begin Queue.pTaskEnded(FJob, FTask, aExcept); end; function TRestartTask.pGetTask: integer; begin if FStarted then Result := inherited pGetTask else Result := 1; end; constructor TRestartTask.Create(aJob: TJob; aTask: integer); begin FJob := aJob; FTask := aTask; end; procedure TRestartTask.ExecuteTask(aTask: integer; FromWaiting: boolean); begin FJob.ExecuteTask(FTask, true); end; function TRestartTask.Running: boolean; begin Result := FStarted; end; { EWaiting } constructor EWaiting.Create(ALauncher: TJob; ANewJob: TJob); begin FLauncher := ALauncher; FNewJob := ANewJob; end; { TQueueThread } constructor TQueueThread.Create(aQueue: TJobQueue); begin MyQueue := aQueue; inherited Create(False); end; procedure TQueueThread.Execute; var wRes: TWaitResult; begin while not Terminated do begin wRes := MyQueue.FEvent.WaitFor(WAIT_TIME); if not Terminated then begin if not ProcessJob then if wRes = wrTimeout then if Assigned(MyQueue.OnIdle) then MyQueue.OnIdle(self); end; end; MyQueue.EnterCriticalSection; try inc(MyQueue.TerminatedThread); if Assigned(MyQueue.TerminateEvent) then if MyQueue.TerminatedThread=MyQueue.Threads.count then MyQueue.TerminateEvent.SetEvent; finally MyQueue.LeaveCriticalSection; end; end; function TQueueThread.ProcessJob: boolean; var aJob: TJob; TaskId: Integer; procedure SetRes(e: Exception); begin MyQueue.EnterCriticalSection; try MyQueue.pTaskEnded(aJob,TaskId,nil); finally MyQueue.LeaveCriticalSection; end; end; var RestartTask: boolean; SomeJob: Boolean; begin Result := false; Repeat SomeJob := false; MyQueue.EnterCriticalSection; try Result := Result or (MyQueue.Jobs.Count > 0); aJob := MyQueue.pGetJob(TaskId, RestartTask); if Assigned(aJob) then begin if TaskId = ALL_TASK_COMPLETED then begin MyQueue.pJobCompleted(aJob); SomeJob := true; end else begin if not(RestartTask) then MyQueue.pTaskStarted(aJob, TaskId); end; end; finally MyQueue.LeaveCriticalSection; end; if Assigned(aJob) then begin SomeJob := true; try aJob.ExecuteTask(TaskId, RestartTask); SetRes(nil); except on e: Exception do if e.InheritsFrom(EWaiting) then MyQueue.DoWaiting(e, TaskId) else SetRes(e); end; end; until not SomeJob; end; { TJobQueue } constructor TJobQueue.Create(NbThread: integer); begin waitings := TStringList.Create; FNbThread := NbThread; FMainThreadId := GetCurrentThreadId; end; destructor TJobQueue.Destroy; begin FreeThreads; ClearWaitings; FreeAndNil(Waitings); inherited; end; procedure TJobQueue.SetUseThreads(AValue: boolean); begin if FUseThreads = AValue then Exit; FUseThreads := AValue; if FUsethreads then InitThreads else FreeThreads; end; procedure TJobQueue.ClearWaitings; var i: integer; begin for i := 0 to pred(Waitings.count) do Waitings.Objects[i].Free; Waitings.Clear; end; procedure TJobQueue.InitThreads; var i: integer; begin Jobs := TObjectList.Create(true); Threads := TObjectList.Create(true); FEvent := TEvent.Create(Nil, False, False,''); FSect := TCriticalSection.Create; TerminatedThread := 0; for i:=1 to FNbThread do Threads.Add(TQueueThread.Create(self)); end; procedure TJobQueue.FreeThreads; var i: integer; begin if Assigned(Threads) then begin TerminateEvent := TEvent.Create(nil, false, false, ''); try TerminatedThread := 0; for i := 0 to Pred(Threads.Count) do TQueueThread(Threads[i]).Terminate; for i := 0 to Pred(Threads.Count) do FEvent.SetEvent; TerminateEvent.WaitFor(TERMINATE_TIMEOUT); for i := 0 to Pred(Threads.Count) do TQueueThread(Threads[i]).WaitFor; FreeAndNil(FSect); FreeAndNil(FEvent); FreeAndNil(Threads); finally FreeAndNil(TerminateEvent); end; FreeAndNil(Jobs); end; end; procedure TJobQueue.EnterCriticalSection; begin if Assigned(FSect) and UseThreads then FSect.Enter; end; procedure TJobQueue.LeaveCriticalSection; begin if Assigned(FSect) and UseThreads then FSect.Leave; end; procedure TJobQueue.DoWaiting(E: Exception; TaskId: integer); var we: EWaiting; begin EnterCriticalSection; try we := EWaiting(e); pAddWaiting(we.FLauncher, TaskId, we.FNewJob.Name); AddUniqueJob(we.FNewJob, we.FLauncher.FLauncher); finally LeaveCriticalSection; end; end; procedure TJobQueue.pAddWaiting(aJob: TJob; aTask: integer; JobId: String); begin Waitings.AddObject(JobId, TRestartTask.Create(aJob, aTask)); end; procedure TJobQueue.pTaskStarted(aJob: TJob; aTask: integer); begin aJob.pTaskStarted(aTask); end; procedure TJobQueue.pJobCompleted(var aJob: TJob); Begin pNotifyWaitings(aJob); if FuseThreads then begin Jobs.Remove(aJob); aJob := nil; end else FreeAndNil(aJob); end; procedure TJobQueue.pTaskEnded(var aJob: TJob; aTask: integer; aExcept: Exception); begin aJob.pTaskEnded(aTask, aExcept); if (aJob.pGetTask = ALL_TASK_COMPLETED) then pJobcompleted(aJob); end; function TJobQueue.pGetJob(out TaskId: integer; out Restart: boolean): TJob; var iJob: integer; aJob: TJob; begin Restart := false; Result := nil; for iJob := 0 to pred(Jobs.Count) do begin aJob := TJob(Jobs[iJob]); if aJob.InheritsFrom(TRestartTask) then begin Result := TRestartTask(aJob).FJob; TaskId := TRestartTask(aJob).FTask; Restart := true; Jobs.Delete(iJob); exit; end; TaskId := aJob.pGetTask; if (TaskId>NO_MORE_TASK) or (TaskId=ALL_TASK_COMPLETED) then begin Result := aJob; Exit; end; end; if not Assigned(Result) then TaskId := NO_MORE_TASK; end; function TJobQueue.pFindJobByName(const aName: string; ByLauncher: TObject): TJobArray; var iRes, i: integer; begin Result := nil; SetLength(Result, Jobs.Count); iRes := 0; for i := 0 to pred(Jobs.Count) do begin if TJob(Jobs[i]).Name = aName then begin if (ByLauncher = nil) or (TJob(Jobs[i]).FLauncher = ByLauncher) then begin Result[iRes] := TJob(Jobs[i]); inc(iRes); end; end; end; SetLength(Result, iRes); end; procedure TJobQueue.pNotifyWaitings(aJob: TJob); var JobId: String; ObjRestart: TRestartTask; idx: integer; begin JobId := aJob.Name; repeat idx := waitings.IndexOf(JobId); if idx <> -1 then begin ObjRestart := TRestartTask(waitings.Objects[idx]); waitings.Delete(idx); Jobs.Add(ObjRestart); end; until idx = -1; end; function TJobQueue.IsMainThread: boolean; begin Result := (GetCurrentThreadId = FMainThreadID); end; procedure TJobQueue.QueueAsyncCall(const AMethod: TDataEvent; Data: PtrInt); begin if UseThreads then Application.QueueAsyncCall(aMethod,Data) else AMethod(Data); end; procedure TJobQueue.RemoveAsyncCalls(const AnObject: TObject); begin Application.RemoveAsyncCalls(AnObject); end; procedure TJobQueue.QueueSyncCall(const AMethod: TDataEvent; Data: PtrInt); var tmp: TSyncCallData; begin tmp := TSyncCallData.Create(AMethod,Data); try TThread.Synchronize(nil, @tmp.SyncCall); finally tmp.Free; end; end; procedure TJobQueue.AddJob(aJob: TJob; Launcher: TObject); var TaskId: Integer; restart: boolean; begin aJob.FLauncher := Launcher; aJob.Queue := self; if Usethreads then begin EnterCriticalSection; try Jobs.Add(aJob); finally LeaveCriticalSection; end; FEvent.SetEvent; end else begin try repeat TaskId := aJob.pGetTask; restart := false; if TaskId > NO_MORE_TASK then begin pTaskStarted(aJob, TaskId); try aJob.ExecuteTask(TaskId, restart); pTaskEnded(aJob,TaskId, nil); except on e: Exception do begin if not e.InheritsFrom(EWaiting) then pTaskEnded(aJob, TaskId, e) else DoWaiting(e, TaskId); end; end; end; if not Assigned(aJob) then TaskId := ALL_TASK_COMPLETED; until TaskId = ALL_TASK_COMPLETED; finally aJob.Free; end; end; end; function TJobQueue.AddUniqueJob(aJob: TJob; Launcher: TObject): boolean; var lst: TJobArray; begin Result := true; if FUseThreads then begin aJob.Queue := self; aJob.FLauncher := Launcher; EnterCriticalSection; try lst := pFindJobByName(aJob.Name, Launcher); if Length(lst) = 0 then Jobs.Add(aJob) else Result := false; finally LeaveCriticalSection; end; FEvent.SetEvent;; end else AddJob(aJob,Launcher); end; function TJobQueue.CancelAllJob(ByLauncher: TObject): TJobArray; var i, iJob: integer; begin Result := nil; if FUseThreads then begin EnterCriticalSection; try SetLEngth(Result, Jobs.Count); iJob := 0; for i := pred(Jobs.Count) downto 0 do begin if (ByLauncher = nil) or (TJob(Jobs[i]).FLauncher = ByLauncher) then begin TJob(Jobs[i]).Cancel; Result[iJob] := TJob(Jobs[i]); iJob += 1; end; end; SetLength(Result, iJob); finally LeaveCriticalSection; end; end; end; function TJobQueue.CancelJobByName(aJobName: String; ByLauncher: TObject): boolean; var lst: TJobArray; i: integer; begin Result := false; if FUseThreads then begin EnterCriticalSection; try lst := pFindJobByName(aJobName, ByLauncher); for i := Low(lst) to High(lst) do begin Result := true; lst[i].Cancel; end; finally LeaveCriticalSection; end; end; end; procedure TJobQueue.WaitForTerminate(const lstJob: TJobArray); var OneFound: Boolean; i: integer; mThread: Boolean; TimeOut: integer; begin TimeOut := 0; mThread := IsMainThread; if FUseThreads then begin repeat OneFound := False; EnterCriticalSection; try for i := Low(lstJob) to High(lstJob) do begin if Jobs.IndexOf(lstJob[i]) <> -1 then begin OneFound := True; break; end; end; finally LeaveCriticalSection; end; if OneFound and (TimeOut > 200) then raise Exception.Create('TimeOut'); if mThread then Application.ProcessMessages; if OneFound then Sleep(100); Inc(TimeOut); until not OneFound; end; end; procedure TJobQueue.WaitAllJobTerminated(ByLauncher: TObject); var OneFound: boolean; i: integer; TimeOut: integer; mThread: Boolean; procedure CheckTimeOut; begin if TimeOut > 200 then raise Exception.Create('TimeOut'); if mThread then Application.ProcessMessages; Sleep(100); inc(TimeOut); end; begin TimeOut := 0; if FUseThreads then begin mThread := IsMainThread; if ByLauncher = nil then begin while Jobs.Count > 0 do CheckTimeOut; end else begin repeat OneFound := False; EnterCriticalSection; try for i := 0 to pred(Jobs.Count) do begin if TJob(Jobs[i]).FLauncher = ByLauncher then begin OneFound := True; break; end; end; finally LeaveCriticalSection; end; if OneFound then CheckTimeOut; until not OneFound; end; end; end; { TJobQueue } procedure TJob.Cancel; var lst: Array of TRestartTask = nil; i, idx: integer; begin Queue.EnterCriticalSection; try FCancelled := true; if (Name <> '') and (Queue.waitings.Count > 0) then begin SetLength(lst, 0); repeat idx := Queue.waitings.IndexOf(Name); if idx <> -1 then begin SetLength(lst, Length(lst)+1); lst[High(lst)] := TRestartTask(Queue.waitings.Objects[idx]); Queue.waitings.Delete(idx); end; until idx = -1; for i := Low(lst) to High(lst) do begin lst[i].Cancel; lst[i].pTaskEnded(1, nil); lst[i].Free; end; end; DoCancel; finally Queue.LeaveCriticalSection; end; end; procedure TJob.DoCancel; begin // end; function TJob.pGetTask: integer; begin result := ALL_TASK_COMPLETED; end; procedure TJob.WaitForResultOf(aJob: TJob); begin raise EWaiting.Create(self,aJob); end; procedure TJob.EnterCriticalSection; begin Queue.EnterCriticalSection; end; procedure TJob.LeaveCriticalSection; begin Queue.LeaveCriticalSection; end; end.