using System; using System.Diagnostics; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using TradeIdeas.ServerConnection; using TradeIdeas.MarketDataProxy.DataFormats; namespace TradeIdeas.MarketDataProxy { public class SymbolListenerThread { /* This object is responsible for managing the invidivual symbol listener objects. * * For simplicity, all symbol listener objects will be created in the beginning, while we are setting up, * before we start any new threads or start listening to exernal requests. * * This object will create a thread for running the symbol listener objects. This object contains the code * needed to accept requests from various external threads and execute them within this thread. This will * simplify the job of the SymbolListeners because they will only have to live in one thread. This will * also take care of setting resonable priorities for the various messages. And it will allow us to see * if the processing of external messages is getting behind because the queues will be backed up. */ private bool _started = false; private static Dictionary _allActive = new Dictionary(); public void Add(string symbol, SymbolListenerBase listener) { if (_started) throw new ApplicationException("Cannot add new symbols after the thread has started"); if (_allActive.ContainsKey(symbol)) throw new ApplicationException("Duplicate implementation for symbol \"" + symbol + '"'); _allActive[symbol] = listener; } // This contains events from Teletrader, TAL, DTN, or another data provider. private class NewDataRequest { public string symbol; public object data; } private Queue _newDataRequests = new Queue(); // An external listener wants information about a stock. private class NewSubscriptionRequest { public string symbol; public ConnectionListener connection; } private LinkedList _newSubscriptionRequsts = new LinkedList(); // An external listener hung up. private Queue _connectionsToClose = new Queue(); // Someone requsted a full debug dump. For simplicity we don't have a full queue. Presumably // the user will hit a button and see it on one window. If it hits it twice in a row // quickly, we don't care if we miss one. public delegate void DebugResponse(string description); private volatile DebugResponse _debugResponse; // Any of the queues got new data. private AutoResetEvent _checkQueues = new AutoResetEvent(false); // These operations are all thread safe. These allow you to send requests into the appropriate thead. public void NewData(string symbol, object data) { // data is just a pass through for us. The specific implementation of SymbolListenerBase should give it // some meaning. In the simplest case it would be a pointer to the message from the data provider. // Presumably the data only contains changes. The SymbolListener object will contain a cached version // of the state, and most things will stay the same. /*Phil, I did have a case that there was apparently a symbol that was null which in turn threw an assertion...which halted my whole gui. As a temporary fix I changed this to an if-block instead...*/ // Debug.Assert(symbol != null); // I actually saw this once, but I caught it in the other thread so I don't know who added it. PDS if (symbol != null) { NewDataRequest r = new NewDataRequest(); r.symbol = symbol; r.data = data; lock (_newDataRequests) { _newDataRequests.Enqueue(r); } _checkQueues.Set(); } } public void NewSubscription(string symbol, ConnectionListener connection) { if (!_allActive.ContainsKey(symbol)) // We don't check this for market data, because that is all internal. We should not be requesting // market data unless the symbol is in the list. However, subscriptions come from an external // source. The external source might not know that we are not covering a symbol. In fact, that's // quite likely to happen. We often break up the work into two or more servers, split the work // evenly, allocating symbols almost randomly. The user is expected to send all requests to all // servers, and let this program figure out what to follow. return; NewSubscriptionRequest r = new NewSubscriptionRequest(); r.symbol = symbol; r.connection = connection; lock (_newSubscriptionRequsts) { _newSubscriptionRequsts.AddLast(r); } _checkQueues.Set(); } public void CloseConnection(ConnectionListener connection) { lock (_connectionsToClose) { _connectionsToClose.Enqueue(connection); } _checkQueues.Set(); } public void RequestDebugDump(DebugResponse callback) { // There are some possible performance issues here. This is intended only for occasional prodding, // probably never used in a live system. The callback will be called in this thread. Presumably // it will be fast, sending the data to the GUI thread and immediately returning. _debugResponse = callback; _checkQueues.Set(); } public void StartProcessing() { Thread t = new Thread(new ThreadStart(ThreadFunction)); t.IsBackground = true; // When the GUI is done, we are done! t.Start(); } private void ThreadFunction() { _started = true; while (true) { // Notice the priorities. // * Really first is the debug request. These should be rare and someone is watching the // situation closely, or this is not a production system so noone cares. // * First, take care of any close requests. Sending data to closed connections would be a waste. // This is espeically important if we are having connection problems and we keep disconnecting // and reconnecting. Also, there should be a finite number of close requests! // * If there are no close requests, check for data requests. This is the main thing we want to // do. This should never get (very far) behind if we are doing our job right. // * If there are no other requests, then take care of new subscriptions. Hopefully we won't have // to wait too long. If there are problems, particularlly if someone keeps connecting and // disconnecting, we don't want that to slow down the people who are already connected. // * If there are no requests at all, wait until we get woken again. DebugResponse debugResponse = _debugResponse; if (debugResponse != null) { _debugResponse = null; RequestDebugDumpImpl(debugResponse); continue; } ConnectionListener connectionToClose = null; lock (_connectionsToClose) { if (_connectionsToClose.Count() > 0) connectionToClose = _connectionsToClose.Dequeue(); } if (connectionToClose != null) { CloseConnectionImpl(connectionToClose); continue; } NewDataRequest newDataRequest = null; lock (_newDataRequests) { if (_newDataRequests.Count() > 0) newDataRequest = _newDataRequests.Dequeue(); } if (newDataRequest != null) { NewDataImpl(newDataRequest); continue; } NewSubscriptionRequest newSubscriptionRequest = null; lock (_newSubscriptionRequsts) { if (_newSubscriptionRequsts.Count() > 0) { newSubscriptionRequest = _newSubscriptionRequsts.First(); _newSubscriptionRequsts.RemoveFirst(); } } if (newSubscriptionRequest != null) { NewSubscriptionImpl(newSubscriptionRequest); continue; } _checkQueues.WaitOne(); } } private void RequestDebugDumpImpl(DebugResponse debugResponse) { StringBuilder description = new StringBuilder(); foreach (KeyValuePair listener in _allActive) { description.Append(listener.Value.ToString()); description.Append("==========================\r\n"); } debugResponse(description.ToString()); } private void CloseConnectionImpl(ConnectionListener connection) { lock (_newSubscriptionRequsts) { // Quickly delete any pending attemtps to subscribe. They are clearly obsolete by now. LinkedListNode current = _newSubscriptionRequsts.First; while (current != null) { // Iterate ourselves because foreach doesn't let you modify the list while you are iterating. LinkedListNode next = current.Next; if (current.Value.connection == connection) _newSubscriptionRequsts.Remove(current); current = next; } } // And remove any current subscriptions. foreach (KeyValuePair listener in _allActive) listener.Value.Release(connection); } private void NewDataImpl(NewDataRequest r) { _allActive[r.symbol].NewData(r.data); DateTime now = DateTime.Now; lock (_allMessages) { _allMessages.Add(r.symbol); _allMessageCount++; if (now >= _nextRestartTime) { _recentMessages.Clear(); _recentMessageCount = 0; _lastRestartTime = now; _nextRestartTime = now.AddMinutes(2); } else { _recentMessages.Add(r.symbol); _recentMessageCount++; } } } private void NewSubscriptionImpl(NewSubscriptionRequest r) { _allActive[r.symbol].AddListener(r.connection); } // This is thread safe. It's never necessary, but it would be nice for the GUI to call this about once // per second and display the results. public class Status { public int subscriptionQueueSize; public int dataProviderQueueSize; public int disconnectQueueSize; public int totalMessages; public int totalSymbols; public int recentMessages; public int recentSymbols; public TimeSpan recentInterval; } public Status GetStatus() { Status result = new Status(); lock (_newSubscriptionRequsts) { result.subscriptionQueueSize = _newSubscriptionRequsts.Count; } lock (_newDataRequests) { result.dataProviderQueueSize = _newDataRequests.Count; } lock (_connectionsToClose) { result.disconnectQueueSize = _connectionsToClose.Count; } lock (_allMessages) { result.totalMessages = _allMessageCount; result.totalSymbols = _allMessages.Count(); result.recentMessages = _recentMessageCount; result.recentSymbols = _recentMessages.Count(); result.recentInterval = _lastRestartTime - DateTime.Now; } return result; } // Lock _allMessages to access any of these. private HashSet _allMessages = new HashSet(); private HashSet _recentMessages = new HashSet(); private int _allMessageCount = 0; private int _recentMessageCount = 0; private DateTime _lastRestartTime = DateTime.Now; private DateTime _nextRestartTime = DateTime.Now; } abstract public class SymbolListenerBase { // This object contains the current cached information about a symbol. For example, the high is 59, the low // is 50.50, and the last price is 52. We might get a message from the data provider saying that the last // price changed to 53, but not telling us anything else. This object will update the last price, it will // assume that the high and low did not change, and it will report the new TOS record to everyone who is // listening for this symbol. Listeners can also request a complete dump of the state without any new data // from the data provider. In fact, adding a new listener automatically requests that info. // // This object is also responsible for keeping track of who is listening. This object will continue to // keep the state up to date, even if no one is currently listening. // // I've tried to seperate this into a base class an an inherited class. This base class will know about // listeners. The inherited class will know how to interpret data from the data provider. This is far // from perfect. In particular, it's not obvoius how we'd handle new subscription types, like the regional // data. For now, when we are only handling the Xetra, we only have two types of messages (TOS and L1) // and one type of subscrption (which requests both types of messages). // // Each object is only used in one thread. So it should not require any locks. If performance becomes an // issue, we might have multiple threads. But each symbol / object would be assigned to only one thread. // So try to avoid static data. Static data might have to be locked. private string _symbol; public SymbolListenerBase(string symbol) { _symbol = symbol; } public string Symbol { get { return _symbol; } } // _connections is a list of listeners. private HashSet _connections = new HashSet(); public void Release(ConnectionListener connection) { _connections.Remove(connection); } public void AddListener(ConnectionListener connection) { _connections.Add(connection); ForceSend(connection); //////For Debug...ForceSend uses the Send() function lock (this) { TradeIdeas.MarketDataProxy.Globals.sendFunctionCounter++; } } // Notice that these default to null. That's probably reasonable until the first time that we get data from // the data provider. It might also be the case that we return to null. That typically depends on the // data provider. Sometimes they give us some indication that the link is down, or that the data is old. // We don't always switch back. protected WireL1 _l1Data = null; protected WireTos _tosData = null; // These are integer constants, rather than an emum, so that other classes can add new data types easily. protected static readonly int CONTEXT_ALL = 0; protected static readonly int CONTEXT_L1 = 1; protected static readonly int CONTEXT_TOS = 2; // This next bunch of routines are all virtual, in case you want to add new data types. protected virtual void ForceSend(ConnectionListener connection) { // Send the current state to the listener because the listener asked for it. Send(connection, CONTEXT_ALL); } // Send the current state to the listener. Context is only meanful to the // derrived class. You might want to send only the TOS or only the L1. // Maybe something more complicated. Note that the default ForceSend() // uses context==0 to mean send everything, but you can replace that. protected virtual void Send(ConnectionListener connection, int context) { if ((context == CONTEXT_ALL) || (context == CONTEXT_L1)) { if (_l1Data == null) connection.SendMessage(WireL1.MakeInvalidMessage(Symbol)); else connection.SendMessage(_l1Data.MakeMessage(Symbol)); } if ((context == CONTEXT_ALL) || (context == CONTEXT_TOS)) { if (_tosData == null) connection.SendMessage(WireTos.MakeInvalidMessage(Symbol)); else connection.SendMessage(_tosData.MakeMessage(Symbol)); } } protected void SendAll(int context) { foreach (ConnectionListener connection in _connections) Send(connection, context); } // Something new came from the data provider. abstract public void NewData(object data); // These are convenience functions to help you decide wether or not to notify a listner. // Generally speaking, if a field changed from the last thing we sent out, that's a reason to notify the // listener. // // TOS is something of a special case. When we receive a new print from the data provider, first we fill // in all the details, including saying that this is a new print. Then we send out the message to everyone. // Then we immediately clear the new print field. So if someone asks for the current state, they won't // mistake that message for a new print event. So, the next time we get a new print, we know at least that // one field will change. So, every new print event is gaurenteed to cause a new message to be sent. static public void Change(ref DateTime? field, DateTime? newValue, ref bool somethingChanged) { if (field != newValue) { field = newValue; somethingChanged = true; } } static public void Change(ref long field, long newValue, ref bool somethingChanged) { if (field != newValue) { field = newValue; somethingChanged = true; } } static public void Change(ref double field, double newValue, ref bool somethingChanged) { if (field != newValue) { field = newValue; somethingChanged = true; } } static public void Change(ref string field, string newValue, ref bool somethingChanged) { if (field != newValue) { field = newValue; somethingChanged = true; } } static public void Change(ref bool field, bool newValue, ref bool somethingChanged) { if (field != newValue) { field = newValue; somethingChanged = true; } } public override string ToString() { // This is aimed at debugging. StringBuilder result = new StringBuilder(); result.Append("Symbol: "); result.Append(Symbol); result.Append("\r\n"); if (_l1Data != null) { result.Append(_l1Data.ToString()); result.Append("\r\n"); } if (_tosData != null) { result.Append(_tosData.ToString()); result.Append("\r\n"); } return result.ToString(); } } }