Unit Heartbeat; Interface Implementation Uses DataNodes, GenericDataNodes, AlertBase, StringIntegerHashTables, Timers, GenericTosDataNode, DebugOutput, SysUtils; //////////////////////////////////////////////////////////////////////// // THeartbeatDispatch //////////////////////////////////////////////////////////////////////// Const PeriodInMinutes = 5; PeriodInSeconds = PeriodInMinutes * 60.0; // This is optimized with the assumption that you will start from nothing, // quickly add a lot of symbols, let it run for a while, then quickly // delete all of the symbols. This will still work in other cases. // We pause when something is added or deleted, so we don't waste a lot of // time or effort while the main program is in the start and stop routines. // If we expected other use cases, we could be more careful about the way // we notify listeners after a deletion. The current algorithm allows for // some items to be called sooner than expected after this. // // This assumptions is not so much for run time speed as for making the // code simpler. Type THeartbeatDispatch = Class(TDataNodeWithStringKey) Private ChannelsInReportOrder : Array Of String; ChannelsBySymbol : THashTable; Timer : ITimerEvent; NextIndex : Integer; SavedItemsToSend : Double; Pause : Boolean; Procedure OnTimer; Procedure OnTimerInDataNodeThread; Constructor Create; Destructor Destroy; Override; Protected Class Function CreateNew(Data : String) : TDataNodeWithStringKey; Override; Public Class Procedure Find(Out Node : THeartbeatDispatch; Out Link : TDataNodeLink); // Call these from the data node thread! Function AddSymbol(Symbol : String) : String; // Duplicates not allowed. Procedure RemoveSymbol(Symbol : String); // If something was not added, this does not hurt. End; Procedure THeartbeatDispatch.OnTimer; Begin If Pause Then Pause := False Else // This is more effecient because we put one item on the queue every // second, rather putting each event on the queue seperately. But // that's just an added bonus. The real benefit to this is that we // can avoid a critical section. DoInCorrectThread(OnTimerInDataNodeThread) End; Procedure THeartbeatDispatch.OnTimerInDataNodeThread; Var TotalCount : Integer; ItemsThisTime : Integer; I : Integer; Info : TBroadcastMessage; Begin TotalCount := ChannelsBySymbol.count; If TotalCount > 0 Then Begin SavedItemsToSend := SavedItemsToSend + TotalCount / PeriodInSeconds; ItemsThisTime := Round(SavedItemsToSend); SavedItemsToSend := SavedItemsToSend - ItemsThisTime; If ItemsThisTime < 0 Then ItemsThisTime := 0 Else If ItemsThisTime > TotalCount Then ItemsThisTime := TotalCount; For I := 1 To ItemsThisTime Do Begin If NextIndex >= TotalCount Then NextIndex := 0; Info := TBroadcastMessage.Create; Info.Send(ChannelsInReportOrder[NextIndex], True); Inc(NextIndex) End; End End; Constructor THeartbeatDispatch.Create; Begin Inherited Create; ChannelsBySymbol := THashTable.Create; Timer := RequestPeriodicCallback(OnTimer, 1000) End; Destructor THeartbeatDispatch.Destroy; Begin Timer.Clear; ChannelsBySymbol.Free; Inherited End; Class Function THeartbeatDispatch.CreateNew(Data : String) : TDataNodeWithStringKey; Begin Result := THeartbeatDispatch.Create End; Class Procedure THeartbeatDispatch.Find(Out Node : THeartbeatDispatch; Out Link : TDataNodeLink); Var TempNode : TDataNodeWithStringKey; Begin FindCommon(THeartbeatDispatch, '', Nil, TempNode, Link); Node := TempNode As THeartbeatDispatch End; Function THeartbeatDispatch.AddSymbol(Symbol : String) : String; Var Count : Integer; AddedNewValue : Boolean; Begin Pause := True; // Return value is the channel. Result := 'THeartbeatDispatch.' + Symbol; Count := ChannelsBySymbol.count; If Length(ChannelsInReportOrder) < 8 Then SetLength(ChannelsInReportOrder, 8) Else If Length(ChannelsInReportOrder) <= Count Then SetLength(ChannelsInReportOrder, 2 * Length(ChannelsInReportOrder)); ChannelsInReportOrder[Count] := Result; AddedNewValue := ChannelsBySymbol.setValue(Result, Count); Assert(AddedNewValue) End; Procedure THeartbeatDispatch.RemoveSymbol(Symbol : String); Var Index : Integer; Begin Pause := True; Index := ChannelsBySymbol.remove(Symbol); If Index <> NoIntegerValue Then ChannelsInReportOrder[Index] := ChannelsInReportOrder[ChannelsBySymbol.Count] End; //////////////////////////////////////////////////////////////////////// // THeartbeat //////////////////////////////////////////////////////////////////////// Var NextEventCount : Integer; BaseQuality : Array [0..127] Of Integer; Type THeartbeat = Class(TAlert) Protected Constructor Create(Params : TParamList); Override; Private Dispatcher : THeartbeatDispatch; EventCount : Integer; Symbol : String; TosDataNode : TGenericTosDataNode; LastPrintTime : TDateTime; Procedure TimerCallback(Msg : TBroadcastMessage; Owner : TDataNode); Procedure NewTosData; Procedure Initialize; Destructor Destroy; Override; Public End; Constructor THeartbeat.Create(Params : TParamList); Begin Assert(Length(Params) = 1); Symbol := Params[0]; Inherited Create; DoInCorrectThread(Initialize) End; Procedure THeartbeat.Initialize; Var Link : TDataNodeLink; Begin EventCount := NextEventCount; Inc(NextEventCount); THeartbeatDispatch.Find(Dispatcher, Link); AddAutoLink(Link); RegisterForBroadcast(Dispatcher.AddSymbol(Symbol), TimerCallback); TGenericTosDataNode.Find(Symbol, NewTosData, TosDataNode, Link); AddAutoLink(Link); Link.SetReceiveInput(True) End; Procedure THeartbeat.NewTosData; Var Last : PTosData; Begin If TosDataNode.IsValid Then Begin Last := TosDataNode.GetLast; If Last^.EventType = etNewPrint Then LastPrintTime := GetSubmitTime {Last^.Time} End End; Procedure THeartbeat.TimerCallback(Msg : TBroadcastMessage; Owner : TDataNode); Const TimeOut = 1.0 / 24.0; Var Quality : Double; BetweenQuality : Double; Begin If (GetSubmitTime - LastPrintTime) <= TimeOut Then Begin BetweenQuality := 2 / (2 - Random); Quality := BaseQuality[EventCount Mod Length(BaseQuality)] * BetweenQuality; Report('All filters satisfied.', Quality) End; Inc(EventCount) End; Destructor THeartbeat.Destroy; Begin If Assigned(Dispatcher) Then Dispatcher.RemoveSymbol(Symbol); Inherited End; //////////////////////////////////////////////////////////////////////// // Initialization //////////////////////////////////////////////////////////////////////// Procedure InitBaseQuality; Var I : Cardinal; Modified : Integer; Begin For I := Low(BaseQuality) To High(BaseQuality) Do Begin BaseQuality[I] := PeriodInMinutes; Modified := I; While (Modified Mod 2) = 1 Do Begin Modified := Modified ShR 1; BaseQuality[I] := BaseQuality[I] ShL 1 End End End; Initialization Randomize; InitBaseQuality; TGenericDataNodeFactory.StoreStandardFactory('Heartbeat', THeartbeat); End.