Unit DataNodes; { This unit provides a standardized way to build complicated data structures which update in real time. To use a data structure, just call it's find procedure. This might create a new data structure, or it might point to one which already exists. Find will also give you a recipt for your request, called a link. Use this link to enable and disable the callback messages from the data structure. Initially the link is disabled; typically you will create several links, then turn them all on after you have finished the rest of your initialization. The link is also used to permanently sever your connection to the data structure. The data structure keeps a count of active links, and automatically releases all resources after the last link has been severed. Internally, most data nodes are implemented using other data nodes. The user does not have to know this. The data structure will automatically find and release any sub data structures that it needs. The result is that each data structure is typically a node in an acyclic graph of data structures. Hence the name DataNodes. This unit creates a thread called the DataNodes thread. All notifications come in this thread. Except as noted, dataNodes should only be accessed in this thread. Special methods allow you to introduce external events into this thread. Some nodes specifically allow the users to access data from other threads. Most superclasses will not have to deal with the DataNode thread directly. Regardless of the ultimate source and sink for the data, most data nodes will be written as if the entire application was single threaded. New data can be brought into the DataNode thread in three ways. The prefered method is to get data from another node. This data is automatically sent in the right thread. The second method is the procedure AddToThreadQueue. This is mostly used when a data node is notified in the wrong thread, and it wants to wake itself in the right thread. The final method is the use of broadcast messages. This allows a library to publish data without regard to who is listening. This unit only provides the infrastructure for these nodes. This includes the methods for finding and reusing data nodes, the methods for releasing data nodes when they are done, the methods for notifying listeners, and the thread in which these nodes do most of their work. However, this unit does not actually have any data. That is provided by superclasses. } Interface Uses StandardHashTables, SyncObjs, Classes, AppEvnts; Type { This is primarily for internal use. Older code which uses this should probably be converted to use broadcasts, instead. } TListenerId = Int64; TDataNodeLink = Class; TDataNodeLinkArray = Array Of TDataNodeLink; TBroadcastMessage = Class; TDataNode = Class; { This is the signature for a broadcast method. This is a very generic mechanism, so it has more that most people need. The Msg parameter is the message which was sent, and owner is the data node which created the subscription. } TBroadcastCallback = Procedure(Msg : TBroadcastMessage; Owner : TDataNode) Of Object; TPriority = (pHigh, { Before any new data. Often called from the event thread to queue up something as soon as the current event is done. } pNormal); { All incoming data. Over 99% of all events. } { pLow - Maintance or cleanup task. This isn't really used. Ideally that's how we'd delete old data nodes, but this could cause a problem. If we are overloaded, and can't catch up with the live data, we might never get the instruction to turn off the data. Especially bad since the marketview will start the new data before turning off the old data. } { DataNode is the base class for all data structures which notify listeners of changes in real time. } TDataNode = Class(TObject) Private CriticalSection : TCriticalSection; // This protects the list of // listeners / links. // To read either list you // must own this critical // section. To modify it, // you must own this and // the create/destroy // lock. // To access this critical section // you must own the create/destroy // critical section, be in the // data node thread, or be in a // special callback from the defered // update / screen updater data // nodes. Listeners : TIntegerObjectHashTable; // The people we may have to notify. OtherOwners : TIntegerObjectHashTable; // Links to us which have no // callbacks. These were // seperated from Listeners // strictly as an optimization. Immortal : Boolean; // Most data nodes are automatically destroyed // when the last listener goes away. Immortals // do not. FListenerId : TListenerId; // This is used internally to keep track // of AddToThreadQueue requests. BroadcastChannels : TStringList; // These are the channels we are // currently listening to. We keep // track of these to prevent someone // from subscribing twice (which // would break some data structures) // and to automatically remove all // open connections when we shut down. Procedure AddLink(Link : TDataNodeLink); // One more listener. Function ReadyForRelease : Boolean; // This is true if there are no // listeners or other reasons // to live. Procedure Release; // This does the second part of the cleanup, the // part which is done in the datanode thread. Class Procedure DoInGuiThread(ToDo : TThreadMethod); Protected { This severs the connection to the given link. It should only be called by TDataNodeLink. If this is the last link, it will cause the data node to release its resources. } Procedure RemoveLink(Link : TDataNodeLink); { This notifies all active listeners. This should only be called in the data node thread. It is safe to call this method after this object has been shut down (or while it is in the process of being shut down). In that case, there are no listeners, so this is a no-op. The standard shutdown FIRST removes all listeners, THEN removes any subordinate data nodes. It is safe to call this in another thread only in the context of the deferred update / screen updater data node. It is always safe to call this if you are handling a call from a subordinate who called this to get to you. } Procedure NotifyListeners; { This makes the given data node a listener to the given channel. Attempting to subscribe the same data node to the same channel more than once is an error. However, it is acceptable to unsubscribe then resubscribe. This can only be called from the data node thread. } Procedure RegisterForBroadcast(Channel : String; Callback : TBroadcastCallback); { This removes the subscription. This should only be called from the data node thread. This can safely be called even if there is no such subscription. All subscriptions are automatically removed before CleanupInThread is called. } Procedure UnRegisterForBroadcast(Channel : String); { This must be called to initialize some private member variables. Setting Immortal to true means that we never clean up the data node, even if it has no listeners. } Constructor Create(Immortal : Boolean = False); { This addes ToDo to the thread queue. ToDo will be called in the data node thread. This method can be called from any thread. The queue is a FIFO. The request will automatically be cancelled if the data node is destroyed before this is called. This is especially useful when the data node subscribes to a service which is not savvy to data nodes. } Procedure AddToThreadQueue(ToDo : TThreadMethod; Priority : TPriority = pNormal); { This is used internally to implement AddToThreadQueue and the broadcast services. It should be private. Currently it is also used by some old code which should be changed to use broadcast services. ToDo is any action, which will be executed as described above. ListenerId is used to cancel a job if the lister is destroyed before the job has started. If ListenerId is the empty sting then the job will never be canceled. } Class Procedure AddToThreadQueue_(ToDo : TThreadMethod; Priority : TPriority = pNormal; ListenerId : TListenerId = -1); { This calls ToDo immediately if we are already in the data node thread, otherwise it adds ToDo to the thread queue. } Procedure DoInCorrectThread(ToDo : TThreadMethod; Priority : TPriority = pNormal); { This critical section is available to make sure that two calls to find are done in serial. Try to avoid using this outside of create and destroy procedures. We know that it takes more time to create and destroy data nodes than it takes for normal processing. } Class Function GetCreateDestroyLock : TCriticalSection; { This creates a new listener. It returns a data node link as the recipt. The recipt can be used to suspend or terminate the connection to this data node. OnChange is called whenever the data in this data node changes, except when the link is suspended or terminated. The callback can be Nil; that allows you to request data whenever you want, not when it changes. Initially the link is suspended. That is done for thread safety reasons. A user will typically create all data nodes THEN start listening to the data nodes. Otherwise the user could get an update message before he has finished initializing himself. } Function CreateLink(OnChange : TThreadMethod) : TDataNodeLink; { Automatic links create simipler code. They are not strictly required, but they solve a lot of problems, especially when creating class heirarchies of data nodes. If you add a link to this list in the constructor, it will automatically be enabled in the AfterConstruction method. All items in this list will be disabled as soon as the last link to this data node is released, before CleanupImmediately. All items in this list will be released in the destructor. Currently this list is not optimized for long lists. AddAutoLink requires n-squared time. } Private AutomaticLinks : TDataNodeLinkArray; Procedure SetAutoLinksActive(Active : Boolean); Procedure ReleaseAutoLinks; Protected Procedure AddAutoLink(Link : TDataNodeLink); Public Procedure AfterConstruction; Override; Public Class Function GetSubmitTime : TDateTime; Class Procedure SetSubmitTime(Time : TDateTime); { This does important cleanup, such as making sure that noone can call the data node from below. It also frees memory. This is called from within the CreateDestroyLock, and in the data node thread. The CriticalSection is obsolete by this time and should not be used. Data nodes should override this procedure to do their own cleanup, but then call inherited at the end of their destroy methods. } Destructor Destroy; Override; End; { This is the intermediary between the data node and its user. Find returns a data node and one of these. Use this to control the connection between the user and the data node. } TDataNodeLink = Class(TObject) Private ReceiveInput : Boolean; Source : TDataNode; Callback : TThreadMethod; Constructor Create(Source : TDataNode; OnChange : TThreadMethod); Procedure NotifyListener; Protected Public { True will allow you to receive notifications each time the corresponding data node changes. False will stop you from receiving notifications. If a notification is in progress when you say SetReceiveInput(False), this method will block until that notification is finished. Initially the state is false. This only controls the notification messages; the user can still poll for data at his leasure. } Procedure SetReceiveInput(B : Boolean); { This method tells the corresponding data node that you no longer need to use it. After calling this method, the user may no longer access this object, or the corresponding data node. This will automaticaly disable notifications from the data node. If a notification from the data node is in progress, this method will block until the notification is finished. } Procedure Release; End; { This class is the basis of a pub/sub method of sending data to data nodes. Data can be published in any thread, but it will always be recieved in the data node thread. Simple messages can be sent directly from this class. Superclasses can be created to hold a payload. The primary purpose of this class is to deal with a data source which might keep sending data after the listener is gone; most channels have 0 or 1 listeners. However, this works with any number of listeners. Channels are all kept in a hash table, so this should be effecient with a large number of channels. Each channel keeps it's listeners in a hash table, so it will be effecient with a large number of listeners. } TBroadcastMessage = Class(TObject) Private FChannel : String; Procedure Deliver; { This subscribes the given node to the given channel. It is an error to subscribe a node to a channel if the node is already listening to that channel. (Unsubscribing then resubscribing is allowed.) This is private because only TDataNode.RegisterForBroadcast should access it. RegisterForBroadcast makes some assertions which protect the underlying data structure. } Class Procedure ListenTo(Channel : String; Node : TDataNode; Callback : TBroadcastCallback); { This reverses the effects of ListenTo. It is private so that TDataNode.UnRegisterForBroadcast can test the required assertions. } Class Procedure StopListening(Channel : String; Node : TDataNode); Protected { This method is called after this message has been delivered to any and all listeners. The default implementation is to free the message. } Procedure AfterDelivery; Virtual; { This method is called immediately before this message is delivered to the first listener. The default implementation is empty. } Procedure BeforeDelivery; Virtual; { This is called once for each listener. It is always called in the data node thread. Node and Callback are copied from the ListenTo procedure. The default implementation is to call the callback. This is good for passive messages. The assumption is that the listener knows how to deal with the data. However, the message, itself, could take an active role and ignore the callback. } Procedure DeliverTo(Node : TDataNode; Callback : TBroadcastCallback); Virtual; Public { This is the token that a listener uses to find this message. } Property Channel : String Read FChannel; { This puts the message into the data node thread's queue. By default, one thread creates this object, optionally sets some fields, then calls send. As soon as the thread calls send, it should never touch the object again. Another thread could free the object at any time. If Immediately is true and we are in the data node thread, we notify the listeners synchronously. Otherwise we just add the request to the queue. } Procedure Send(Channel : String; Immediately : Boolean = False; Priority : TPriority = pNormal); { This sends the message to a reserved channel that noone listens to. This is a way to use this class to send work to the data node thread. Immediately=false, so this can be used to do work later. } Procedure NullSend(Priority : TPriority = pNormal); End; Type TDataNodeWithStringKey = Class; TTDataNodeWithStringKey = Class Of TDataNodeWithStringKey; { This class is a common building block for people making superclasses of TDataNode. This is of no interest to people who are only using data nodes. This handles most of the overhead involved in finding and releasing data nodes. The assumption is that each unique node can produce a unique key which describes it. The key must be a sting. The key must be unique for the class, not globally. Keys are (unfortunately) not case sensitive. } TDataNodeWithStringKey = Class(TDataNode) Private Key : String; Protected { This does most of the work of find. Most find methods will just call this method, then cast the node to a more specific type. This will take case of finding an existing copy of the data node, if it exists, or creating a new copy if required. This also takes care of locking the lookup table, so it may be called from any thread. } Class Procedure FindCommon(Source : TTDataNodeWithStringKey; Data : String; OnChange : TThreadMethod; Out Node : TDataNodeWithStringKey; Out Link : TDataNodeLink); { This merges the class name with data specific to the class to make a globally unique key. } Class Function GetKey(Source : TTDataNodeWithStringKey; Data : String) : String; { This is used by FindCommon. It is called if we do not already have an appropriate data node. We already own the create/destroy lock when this is called. The original data from FindCommon is passed back to this method as a convenience. Alternatively, the Find procedure may store data in thread local storage, and this method may use it. } Class Function CreateNew(Data : String) : TDataNodeWithStringKey; Virtual; Abstract; Public { This method removes this data node from the list of data nodes available to be reused. } Destructor Destroy; Override; End; { This is a special data node that wakes it's listers when they should update. This is used for updating the screen. If we request a screen update every time we get new data, the CPU would quickly go to 100%, and the screen would flicker. The user could not read the updates this quickly, so these fast updates would serve no purpose. There are exactly two data nodes of this class. One executes in the DataNode thread. The other executes in the GUI thread. When the latter executes, the DataNode thread is suspended. The former is always called shortly before the latter. A listener can use one or both of these to transfer data from the DataNode thread to the GUI thread in a thread safe manner. A listener should only request a repaint when it hears from the GuiThread version of this data node. The current implementation is trivial. This wakes all listeners every 100ms. This needs improvement. The time should adapt to the actual CPU useage. By listening to this, rather than a time, all components can take advantage of the better algorithm when it is available. } TScreenUpdater = Class(TDataNode) Public Class Procedure Find(OnChange : TThreadMethod; Out Link : TDataNodeLink; GuiThread : Boolean = True); // The next two procedures call the one above. I added the two below // to make the calling code more readable. Class Procedure FindInGuiThread(OnChange : TThreadMethod; Out Link : TDataNodeLink); Class Procedure FindBeforeGuiThread(OnChange : TThreadMethod; Out Link : TDataNodeLink); Protected Private ApplicationEvents : TApplicationEvents; TimerCount, UpdateCount : Integer; Procedure TimerHandler(Sender: TObject); Procedure IdleHandler(Sender: TObject; var Done: Boolean); Procedure TimerInDataModelThread; Constructor Create(GuiThread : Boolean); End; { This is just a convenient helper for the TScreenUpdater class. A GUI element can create one of these to handle all screen updates. All other data nodes that the GUI uses should call this object's NewData method. When of of these objects has data, a dirty flag will be set, and the owner of this node will be notified at the next appropriate time in the GUI thread. Add subordinate keeps track of the links for all the other data nodes, so the gui only has to keep a link to this object. } TDeferredUpdate = Class(TDataNode) Public Class Procedure Find(OnChange : TThreadMethod; Out Node : TDeferredUpdate; Out Link : TDataNodeLink); Procedure NewData; Procedure AddSubordinate(Link : TDataNodeLink); Destructor Destroy; Override; Private ScreenUpdaterLink : TDataNodeLink; Dirty : Boolean; Subordinates : TList; Constructor Create; Procedure TimerUpdate; End; Procedure UpdateDebugWindow; Function ActiveDataNodeCount : Integer; Procedure GetActiveDataNodeDescriptions(Items : TStrings); Implementation Uses Timers, DataNodesDebug, DebugOutput, Int64ObjectHashTables, Windows, SysUtils, Forms, ExtCtrls, Contnrs; //////////////////////////////////////////////////////////////////////// // Listener Ids //////////////////////////////////////////////////////////////////////// Var NextListenerId : Int64 = 1; ActiveListenerIds : TInt64ObjectHashTable; ListenerIdsCriticalSection : TCriticalSection; Function GetNextListenerId(DebugInfo : TObject) : TListenerId; Begin ListenerIdsCriticalSection.Enter; Try Result := NextListenerId; ActiveListenerIds.setValue(Result, DebugInfo); Inc(NextListenerId) Finally ListenerIdsCriticalSection.Leave End End; (*Function GetNextListenerId(Listener : TObject) : TListenerId; Begin ListenerIdsCriticalSection.Enter; Try If Assigned(Listener) Then Result := Format('%s($%x):%x', [Listener.ClassName, Integer(Listener), NextListenerId]) Else Result := Format('%x', [NextListenerId]); ActiveListenerIds.setValue(Result, Nil); Inc(NextListenerId) Finally ListenerIdsCriticalSection.Leave End End;*) Function ListenerIdActive(ID : TListenerId) : Boolean; Begin If ID = -1 Then Result := True Else Begin ListenerIdsCriticalSection.Enter; Try Result := ActiveListenerIds.containsKey(ID); Finally ListenerIdsCriticalSection.Leave End End End; Procedure DeactivateListenerId(ID : TListenerId); Begin ListenerIdsCriticalSection.Enter; Try ActiveListenerIds.remove(ID); Finally ListenerIdsCriticalSection.Leave End End; Function ActiveDataNodeCount : Integer; Begin ListenerIdsCriticalSection.Enter; Try Result := ActiveListenerIds.getCount Finally ListenerIdsCriticalSection.Leave End End; Procedure GetActiveDataNodeDescriptions(Items : TStrings); Var It : TInt64ObjectHashTableIterator; Key : Int64; Description : String; DebugObject : TObject; Begin Items.Clear; It := tHashTableIterator.create(ActiveListenerIds); Try ListenerIdsCriticalSection.Enter; Try While It.validEntry Do Begin Key := It.getKey; DebugObject := It.getValue; If Assigned(DebugObject) Then Description := Format('%s($%x)', [DebugObject.ClassName, Integer(DebugObject)]) Else Description := 'Nil'; Items.Add(Format('%s; %d', [Description, Key])); It.next End Finally ListenerIdsCriticalSection.Leave End Finally It.Free End End; //////////////////////////////////////////////////////////////////////// // TDataNodeThread //////////////////////////////////////////////////////////////////////// ThreadVar InDataNodeThread : Boolean; Type TCallbackContainer = Class(TObject) Private StaticallyAllocated : Boolean; Public Callback : TThreadMethod; ListenerId : TListenerId; SubmitTime : TDateTime; End; Type TDataNodeThread = Class(TThread) Private FSubmitTime : TDateTime; CriticalSection : TCriticalSection; Queue : Array [TPriority] Of TObjectQueue; Event : THandle; CallbackContainerFreeList : TObjectStack; Function Get(Unused : Integer) : TThreadMethod; Procedure ExecuteOneItem(ToDo : TThreadMethod); Function Pop : TCallbackContainer; Procedure PreallocateCallbackContainers(Count : Integer); Protected Procedure Execute; override; Public AddCount : Cardinal; GetCount : Cardinal; Constructor Create(CreateSuspended: Boolean); Procedure Add(ToDo : TThreadMethod; ListenerId : TListenerId; Priority : TPriority); Property SubmitTime : TDateTime Read FSubmitTime Write FSubmitTime; End; Var DataNodeThread : TDataNodeThread; Function DisplayMethod1(M : TThreadMethod) : String; Var Split : Packed Record Case Boolean Of True : (TM : TThreadMethod); False : (First : Pointer; O : TObject); End; ObjectName, MethodName : String; Procedure UnknownMethod; Begin MethodName := Format('Unknown($%.8x)', [Integer(Split.First)]) End; Begin Try Split.TM := M; If Assigned(Split.O) Then Begin ObjectName := Format('%s($%.8x)', [Split.O.ClassName, Integer(Split.O)]); MethodName := Split.O.MethodName(Split.First); If MethodName = '' Then UnknownMethod End Else Begin ObjectName := 'Nil'; If Assigned(Split.First) Then UnknownMethod Else MethodName := 'Nil' End; Result := ObjectName + ', ' + MethodName Except On E : Exception Do Result := E.Message End End; Function DisplayMethod(M : TThreadMethod) : String; Var Split : Packed Record Case Boolean Of True : (TM : TThreadMethod); False : (First : Pointer; O : TObject); End; ObjectName, MethodName, Channel : String; Procedure UnknownMethod; Begin MethodName := Format('Unknown($%.8x)', [Integer(Split.First)]) End; Begin Try Channel := ''; Split.TM := M; If Assigned(Split.O) Then Begin ObjectName := Format('%s($%.8x)', [Split.O.ClassName, Integer(Split.O)]); MethodName := Split.O.MethodName(Split.First); If MethodName = '' Then UnknownMethod; If Split.O Is TBroadcastMessage Then // For some reason the "Is" operator here causes a null pointer exception when it should return true! Channel := (Split.O As TBroadcastMessage).Channel; End Else Begin ObjectName := 'Nil'; If Assigned(Split.First) Then UnknownMethod Else MethodName := 'Nil' End; Result := ObjectName + ', ' + MethodName + ', ' + Channel Except On E : Exception Do Result := E.Message End End; Procedure TDataNodeThread.PreallocateCallbackContainers(Count : Integer); Var I : Integer; Container: TCallbackContainer; Begin For I := 1 To Count Do Begin Container := TCallbackContainer.Create; Container.StaticallyAllocated := True; CallbackContainerFreeList.Push(Container) End End; Procedure TDataNodeThread.ExecuteOneItem(ToDo : TThreadMethod); //Var // Time : TDateTime; Begin //Time := Now; ToDo; //Time := Now - Time End; { Procedure TDataNodeThread.Execute; Var ToDo : TThreadMethod; Begin Try InDataNodeThread := True; Repeat ToDo := Get(0); If Assigned(ToDo) Then ExecuteOneItem(ToDo) Else WaitForSingleObject(Event, $ffffffff) Until False Except On E : Exception Do Application.MessageBox(PChar('In DataNode thread: ' + E.message), 'exception'); End End;} Procedure TDataNodeThread.Execute; Var ToDo : TThreadMethod; Begin InDataNodeThread := True; Repeat ToDo := Get(0); If Assigned(ToDo) Then Try ExecuteOneItem(ToDo) Except On E : Exception Do DebugOutput.DebugOutputWindow.AddMessage('In DataNode thread: ' + E.message) End Else WaitForSingleObject(Event, $ffffffff) Until False End; Constructor TDataNodeThread.Create(CreateSuspended: Boolean); Var P : TPriority; Begin For P := Low(TPriority) To High(TPriority) Do Queue[P] := TObjectQueue.Create; CallbackContainerFreeList := TObjectStack.Create; PreallocateCallbackContainers(10000); CriticalSection := TCriticalSection.Create; Event := CreateEvent(Nil, False, False, Nil); Assert(Event <> 0); Inherited End; Function TDataNodeThread.Pop : TCallbackContainer; Var Priority : TPriority; Begin For Priority := Low(TPriority) To High(TPriority) Do If Queue[Priority].AtLeast(1) Then Begin Result := Queue[Priority].Pop As TCallbackContainer; Exit End; Result := Nil End; Procedure TDataNodeThread.Add(ToDo : TThreadMethod; ListenerId : TListenerId; Priority : TPriority); Var Container : TCallbackContainer; Begin CriticalSection.Enter; Try If CallbackContainerFreeList.AtLeast(1) Then Container := CallbackContainerFreeList.Pop As TCallbackContainer Else Container := TCallbackContainer.Create; Container.Callback := ToDo; Container.ListenerId := ListenerId; Container.SubmitTime := Now; Inc(AddCount); Queue[Priority].Push(Container) Finally CriticalSection.Leave End; SetEvent(Event) End; Function TDataNodeThread.Get(Unused : Integer) : TThreadMethod; Var Container : TCallbackContainer; Begin CriticalSection.Enter; Try Result := Nil; FSubmitTime := 0; Repeat Container := Pop; If Not Assigned(Container) Then Break; Try Inc(GetCount); If ListenerIdActive(Container.ListenerId) Then Begin FSubmitTime := Container.SubmitTime; Result := Container.Callback; Break End Finally If Container.StaticallyAllocated Then CallbackContainerFreeList.Push(Container) Else Container.Free End Until False Finally CriticalSection.Leave End End; Procedure UpdateDebugWindow; Var EventsIn, EventsOut : Integer; TimerSleeping : Boolean; NextWaittime : Integer; Begin If Assigned(DataNodeThread) Then Begin DNDebug.AddedtoDNQ.Caption := Format('%.0n', [DataNodeThread.AddCount + 0.0]); DNDebug.RemovedFromDNQ.Caption := Format('%.0n', [DataNodeThread.GetCount + 0.0]); DNDebug.InDNQ.Caption := Format('%.0n', [DataNodeThread.AddCount - DataNodeThread.GetCount + 0.0]) End; DNDebug.ActiveDataNodes.Caption := Format('%.0n', [ActiveDataNodeCount + 0.0]); TimerDebugInfo(EventsIn, EventsOut, TimerSleeping, NextWaitTime); DNDebug.AddedToTimerQ.Caption := Format('%.0n', [EventsIn + 0.0]); DNDebug.RemovedFromTimerQ.Caption := Format('%.0n', [EventsOut + 0.0]); DNDebug.InTimerQ.Caption := Format('%.0n', [EventsIn - EventsOut + 0.0]); If TimerSleeping Then DNDebug.TimerStatus.Caption := 'Sleeping' Else DNDebug.TimerStatus.Caption := 'Working'; If NextWaittime = MaxInt Then DNDebug.NextWaitTime.Caption := 'MaxInt' else DNDebug.NextWaitTime.Caption := Format('%.0nms', [NextWaitTime + 0.0]) End; //////////////////////////////////////////////////////////////////////// // TBroadcastMessage //////////////////////////////////////////////////////////////////////// Type TBroadcastContainer = Class(TObject) Public Node : TDataNode; Callback : TBroadcastCallback; End; Var AllBroadcastListeners : TStringObjectHashTable; // Assume we are in the data node thread, so we don't lock anything. // // It is possible that, in the process of delivering a message, a new // listener will request the message. That's why we copy the list of // listeners before we notify any of them. Otherwise we would get an // iterator out of date error. In this case, no error is generated, // but the new listener will not see this message. It will see the // next message of this type. // // It is also possible that, while delivering a message, a listener // unregisters for that message. No call will be made after the // unregister request. // // It is possible that, while delivering a message, a listener will // unregister, then re-register for that message. In that case we // do the call back. If the callback changes, we use the last callback // that was registered. Procedure TBroadcastMessage.Deliver; Var Listeners : TIntegerObjectHashTable; I : Integer; It : TIntegerObjectHashTableIterator; Keys : Array Of Integer; Container : TBroadcastContainer; Begin BeforeDelivery; Listeners := TIntegerObjectHashTable(AllBroadcastListeners[FChannel]); If Assigned(Listeners) Then Begin SetLength(Keys, Listeners.getCount); It := Listeners.getIterator; For I := 0 To Pred(Length(Keys)) Do Begin Keys[I] := It.key; It.next End; It.Free; For I := 0 To Pred(Length(Keys)) Do Begin Container := Listeners[Keys[I]] As TBroadcastContainer; If Assigned(Container) Then DeliverTo(Container.Node, Container.Callback) End End; AfterDelivery End; Procedure TBroadcastMessage.DeliverTo(Node : TDataNode; Callback : TBroadcastCallback); Begin Callback(Self, Node) End; // Assume we are in the data node thread, so we don't lock anything. // The caller takes responsibility for ensuring there are no duplicates. Class Procedure TBroadcastMessage.ListenTo(Channel : String; Node : TDataNode; Callback : TBroadcastCallback); Var Listeners : TIntegerObjectHashTable; Container : TBroadcastContainer; Begin Listeners := TIntegerObjectHashTable(AllBroadcastListeners[Channel]); If Not Assigned(Listeners) Then Begin Listeners := TIntegerObjectHashTable.Create; AllBroadcastListeners[Channel] := Listeners End; Container := TBroadcastContainer.Create; Container.Node := Node; Container.Callback := Callback; Listeners[Integer(Node)] := Container End; // Assume we are in the data node thread, so we don't lock anything. // The caller takes responsibility for ensuring that the given entry exists // exactly once in the list before this call. Class Procedure TBroadcastMessage.StopListening(Channel : String; Node : TDataNode); Var Listeners : TIntegerObjectHashTable; Begin Listeners := TIntegerObjectHashTable(AllBroadcastListeners[Channel]); Listeners.remove(Integer(Node)).Free; If Listeners.Count = 0 Then Begin AllBroadcastListeners.Remove(Channel); Listeners.Free End End; Procedure TBroadcastMessage.AfterDelivery; Begin Free End; Procedure TBroadcastMessage.BeforeDelivery; Begin End; Procedure TBroadcastMessage.Send(Channel : String; Immediately : Boolean = False; Priority : TPriority = pNormal); Begin FChannel := Channel; If Immediately And InDataNodeThread Then Deliver Else TDataNode.AddToThreadQueue_(Deliver, Priority) End; Procedure TBroadcastMessage.NullSend(Priority : TPriority = pNormal); Begin Send('Null', False, Priority) End; //////////////////////////////////////////////////////////////////////// // TDataNode //////////////////////////////////////////////////////////////////////// Var CreateDestroyLock : TCriticalSection; AutoLinksLock : TCriticalSection; //DataNodeCount : Integer; //DebugLast : String; Procedure TDataNode.AddAutoLink(Link : TDataNodeLink); Var NewIndex : Integer; Begin Assert(Assigned(Link), 'AddAutoLink(Nil)'); AutoLinksLock.Enter; Try NewIndex := Length(AutomaticLinks); SetLength(AutomaticLinks, Succ(NewIndex)); AutomaticLinks[NewIndex] := Link Finally AutoLinksLock.Leave End End; Procedure TDataNode.SetAutoLinksActive(Active : Boolean); Var I : Integer; LinksCopy : TDataNodeLinkArray; Begin AutoLinksLock.Enter; Try LinksCopy := Copy(AutomaticLinks, 0, MaxInt) Finally AutoLinksLock.Leave End; For I := 0 To Pred(Length(LinksCopy)) Do LinksCopy[I].SetReceiveInput(Active) End; Procedure TDataNode.ReleaseAutoLinks; Var I : Integer; LinksCopy : TDataNodeLinkArray; Begin AutoLinksLock.Enter; Try LinksCopy := AutomaticLinks; SetLength(AutomaticLinks, 0) Finally AutoLinksLock.Leave End; For I := 0 To Pred(Length(LinksCopy)) Do LinksCopy[I].Release End; Procedure TDataNode.AfterConstruction; Begin Inherited; SetAutoLinksActive(True) End; Class Procedure TDataNode.SetSubmitTime(Time : TDateTime); Begin DataNodeThread.SubmitTime := Time End; Class Function TDataNode.GetSubmitTime : TDateTime; Begin Result := DataNodeThread.SubmitTime End; Procedure TDataNode.RegisterForBroadcast(Channel : String; Callback : TBroadcastCallback); Begin If BroadcastChannels.IndexOf(Channel) = -1 Then Begin BroadcastChannels.Add(Channel); TBroadcastMessage.ListenTo(Channel, Self, Callback) End End; Procedure TDataNode.UnRegisterForBroadcast(Channel : String); Var Index : Integer; Begin Index := BroadcastChannels.IndexOf(Channel); If (Index <> -1) Then Begin BroadcastChannels.Delete(Index); TBroadcastMessage.StopListening(Channel, Self) End End; Class Function TDataNode.GetCreateDestroyLock : TCriticalSection; Begin Result := CreateDestroyLock End; Function TDataNode.CreateLink(OnChange : TThreadMethod) : TDataNodeLink; Begin Result := TDataNodeLink.Create(Self, OnChange); AddLink(Result) End; Procedure TDataNode.AddLink(Link : TDataNodeLink); Begin // We must already be in CreateDestroyLock. Otherwise this object // could be deleted before AddLink() was called.; CriticalSection.Enter; Try If Assigned(Link.Callback) Then Listeners[Integer(Link)] := Link Else OtherOwners[Integer(Link)] := Link Finally CriticalSection.Leave End End; Function TDataNode.ReadyForRelease : Boolean; Begin CriticalSection.Enter; Try Result := (Not Immortal) And (Listeners.count = 0) And (OtherOwners.count = 0) Finally CriticalSection.Leave End End; Procedure TDataNode.RemoveLink(Link : TDataNodeLink); Begin CreateDestroyLock.Enter; Try CriticalSection.Enter; Try If Assigned(Link.Callback) Then Listeners.Remove(Integer(Link)) Else OtherOwners.Remove(Integer(Link)) Finally CriticalSection.Leave End; If ReadyForRelease Then { Ideally we'd send this at low priority, but that could cause problems. If somehow we were getting flooded with data, we'd never be able to turn it off! } DoInCorrectThread(Release, pNormal) { Hands off! Now that we (may have) decremented the reference counter to zero, and (possibly) called the release function, this object may already be deleted. } Finally CreateDestroyLock.Leave End End; Procedure TDataNode.Release; Begin { If someone brought the reference count to 0 in another thread, it is possible that someone has already brought it back up. Enter the CreateDestroy critical section and check. } CreateDestroyLock.Enter; Try If ReadyForRelease Then { We are now safe. The reference count is 0, so noone will touch this object from above. We have the the CreateDestroy critical section, so noone can find the object in a list and bump the reference count up. We are in the data node thread and we own the CreateDestroy lock, so this object's critical section would be redundant. We are in the data node thread, so no subordinate data nodes can contact this object. } Free Finally CreateDestroyLock.Leave End End; Destructor TDataNode.Destroy; Var I : Integer; Begin DeactivateListenerId(FListenerId); For I := 0 To Pred(BroadcastChannels.Count) Do TBroadcastMessage.StopListening(BroadcastChannels[I], Self); BroadcastChannels.Free; ReleaseAutoLinks; Listeners.Free; OtherOwners.Free; CriticalSection.Free; Inherited End; Constructor TDataNode.Create(Immortal : Boolean); Begin CriticalSection := TCriticalSection.Create; Listeners := TIntegerObjectHashTable.Create; OtherOwners := TIntegerObjectHashTable.Create; FListenerId := GetNextListenerId(Self); BroadcastChannels := TStringList.Create; Self.Immortal := Immortal End; Procedure TDataNode.NotifyListeners; Var I : Integer; ListenersCopy : Array Of Integer; Listener : TDataNodeLink; It : TIntegerObjectHashTableIterator; Begin CriticalSection.Enter; It := Nil; Try SetLength(ListenersCopy, Listeners.count); It := Listeners.GetIterator; I := 0; While It.validEntry Do Begin ListenersCopy[I] := It.getKey; Inc(I); It.next End; For I := 0 To Pred(Length(ListenersCopy)) Do Begin // No other threads will modify the listeners list, but // each call to notify listener could add or remove links. Listener := Listeners[ListenersCopy[I]] As TDataNodeLink; Listener.NotifyListener End Finally CriticalSection.Leave; It.Free End End; Procedure TDataNode.AddToThreadQueue(ToDo : TThreadMethod; Priority : TPriority = pNormal); Begin Assert(FListenerId <> 0, 'Not initialized: ' + ClassName); DataNodeThread.Add(ToDo, FListenerId, Priority) End; Class Procedure TDataNode.AddToThreadQueue_(ToDo : TThreadMethod; Priority : TPriority = pNormal; ListenerId : TListenerId = -1); Begin DataNodeThread.Add(ToDo, ListenerId, Priority) End; Procedure TDataNode.DoInCorrectThread(ToDo : TThreadMethod; Priority : TPriority = pNormal); Begin If InDataNodeThread Then ToDo Else AddToThreadQueue(ToDo, Priority) End; Class Procedure TDataNode.DoInGuiThread(ToDo : TThreadMethod); Begin DataNodeThread.Synchronize(ToDo) End; //////////////////////////////////////////////////////////////////////// // DataNodeLink //////////////////////////////////////////////////////////////////////// Constructor TDataNodeLink.Create(Source : TDataNode; OnChange : TThreadMethod); Begin Self.Source := Source; Callback := OnChange End; Procedure TDataNodeLink.SetReceiveInput(B : Boolean); Begin If B Then Assert(Assigned(Self), 'Calling SetReceiveInput(True) on Nil') Else Assert(Assigned(Self), 'Calling SetReceiveInput(False) on Nil'); If B Then Begin If Assigned(Callback) Then ReceiveInput := True End Else Begin { Strange error here. Access violation, attempt to read FFFFFFFF. Happened when trying to create the alerts window. Automatic retry worked without incident. Again, this time reading 0000000C. } Source.CriticalSection.Enter; ReceiveInput := False; Source.CriticalSection.Leave End End; Procedure TDataNodeLink.NotifyListener; Begin If Assigned(Callback) And ReceiveInput Then Try Callback Except On E : Exception Do If (E Is EExternal) And Assigned(EExternal(E).ExceptionRecord) Then Raise Exception.CreateFmt('(%s, %s) raised in (%s) at $%p', [E.ClassName, E.Message, DisplayMethod1(Callback), EExternal(E).ExceptionRecord^.ExceptionAddress]) Else Raise Exception.CreateFmt('(%s, %s) raised in (%s)', [E.ClassName, E.Message, DisplayMethod1(Callback)]) End End; Procedure TDataNodeLink.Release; Begin Assert(Assigned(Self)); SetReceiveInput(False); Source.RemoveLink(Self); Free End; //////////////////////////////////////////////////////////////////////// // TDataNodeWithStringKey //////////////////////////////////////////////////////////////////////// Var TDataNodeWithStringKeyInstances : TStringObjectHashTable; Class Function TDataNodeWithStringKey.GetKey(Source : TTDataNodeWithStringKey; Data : String) : String; Begin Result := Source.ClassName + ';' + Data End; Destructor TDataNodeWithStringKey.Destroy; Var Previous : TObject; Begin Assert(Key <> ''); Previous := TDataNodeWithStringKeyInstances.remove(Key); Assert(Assigned(Previous)); Inherited End; Class Procedure TDataNodeWithStringKey.FindCommon(Source : TTDataNodeWithStringKey; Data : String; OnChange : TThreadMethod; Out Node : TDataNodeWithStringKey; Out Link : TDataNodeLink); Var Key : String; Begin Key := GetKey(Source, Data); GetCreateDestroyLock.Enter; Try Node := TDataNodeWithStringKeyInstances[Key] As TDataNodeWithStringKey; If Not Assigned(Node) Then Begin // Not Found. Create New. Node := CreateNew(Data); Node.Key := Key; TDataNodeWithStringKeyInstances[Key] := Node End; Link := Node.CreateLink(OnChange) Finally GetCreateDestroyLock.Leave End End; //////////////////////////////////////////////////////////////////////// // TScreenUpdater //////////////////////////////////////////////////////////////////////// Var ScreenUpdaterInitialized : Boolean = False; ScreenUpdaterInstances : Array [Boolean] Of TScreenUpdater; Class Procedure TScreenUpdater.Find(OnChange : TThreadMethod; Out Link : TDataNodeLink; GuiThread : Boolean); Begin GetCreateDestroyLock.Enter; Try If Not ScreenUpdaterInitialized Then Begin ScreenUpdaterInstances[True] := Create(True); ScreenUpdaterInstances[False] := Create(False); ScreenUpdaterInitialized := True End; Link := ScreenUpdaterInstances[GuiThread].CreateLink(OnChange) Finally GetCreateDestroyLock.Leave End End; Class Procedure TScreenUpdater.FindInGuiThread(OnChange : TThreadMethod; Out Link : TDataNodeLink); Begin Find(OnChange, Link, True) End; Class Procedure TScreenUpdater.FindBeforeGuiThread(OnChange : TThreadMethod; Out Link : TDataNodeLink); Begin Find(OnChange, Link, False) End; Procedure TScreenUpdater.TimerHandler(Sender: TObject); Begin Inc(TimerCount); If (TimerCount > 9) Then Begin UpdateCount := 0; TimerCount := 0; AddToThreadQueue(TimerInDataModelThread) End Else Inc(UpdateCount) End; Procedure TScreenUpdater.IdleHandler(Sender: TObject; var Done: Boolean); Begin Inc(UpdateCount) End; Procedure TScreenUpdater.TimerInDataModelThread; Begin NotifyListeners; DoInGuiThread(ScreenUpdaterInstances[True].NotifyListeners) End; Constructor TScreenUpdater.Create(GuiThread : Boolean); Var Timer : TTimer; Begin Inherited Create(True); If (Not GuiThread) Then Begin ApplicationEvents := TApplicationEvents.Create(Application); ApplicationEvents.OnIdle := IdleHandler; Timer := TTimer.Create(Application); Timer.Enabled := True; Timer.Interval := 10; Timer.OnTimer := TimerHandler; //RequestPeriodicCallback(TimerHandler, 100) End End; //////////////////////////////////////////////////////////////////////// // TDeferredUpdate //////////////////////////////////////////////////////////////////////// Class Procedure TDeferredUpdate.Find(OnChange : TThreadMethod; Out Node : TDeferredUpdate; Out Link : TDataNodeLink); Begin Node := Create; Link := Node.CreateLink(OnChange) End; Procedure TDeferredUpdate.NewData; Begin Dirty := True End; Constructor TDeferredUpdate.Create; Begin Inherited Create; Subordinates := TList.Create; TScreenUpdater.Find(TimerUpdate, ScreenUpdaterLink); ScreenUpdaterLink.SetReceiveInput(True) End; Destructor TDeferredUpdate.Destroy; Var I : Integer; Begin For I := 0 To Pred(Subordinates.Count) Do TDataNodeLink(Subordinates[I]).Release; Subordinates.Free; ScreenUpdaterLink.Release; Inherited End; Procedure TDeferredUpdate.AddSubordinate(Link : TDataNodeLink); Begin Subordinates.Add(Link); Link.SetReceiveInput(True) End; Procedure TDeferredUpdate.TimerUpdate; Begin If Dirty Then Begin Dirty := False; NotifyListeners End End; //////////////////////////////////////////////////////////////////////// // Initialization //////////////////////////////////////////////////////////////////////// Initialization DataNodeThread := TDataNodeThread.Create(False); CreateDestroyLock := TCriticalSection.Create; AutoLinksLock := TCriticalSection.Create; TDataNodeWithStringKeyInstances := TStringObjectHashTable.Create; ListenerIdsCriticalSection := TCriticalSection.Create; ActiveListenerIds := TInt64ObjectHashTable.Create; AllBroadcastListeners := TStringObjectHashTable.Create; ActiveListenerIds.Shrink End.