using System; using System.Collections.Generic; using TradeIdeas.MiscSupport; using TradeIdeas.ServerConnection; using TradeIdeas.TIProData.Interfaces; namespace TradeIdeas.TIProData { /// /// This handles the data that was available to the Delphi TIQ client. We still use that format /// for a lot of things. In particular, sending alerts. The old TIQ server will not be rewritten /// any time soon. And the new TIQ (tikiller) server will send a lot of similar data. /// /// These messages are called alerts, however we actually send a lot of data this way. Some things /// are more like normal top lists. The format is very flexible and you can send almost anything. /// /// Part of this was abstracted from TMainConnection in MainConnectionUnit.pas. In particular, the /// names of the invidiual commands for the server came from there. A lot of that Pascal code has /// been moved to other parts of IConnectionMaster in the C# code. /// /// The overall structure for this class was inspired more by StreamingResponseManager in the C# /// code. In particular, all of the C# code is thread safe while the Delphi code assumes that /// most of the work will happen in the GUI thread. /// public class TraditionalAlertsManager { public delegate void OnResponse(String category, String message); private Object _mutex = new object(); /// /// This is protected by _mutex. /// private readonly Dictionary _subscriptions = new Dictionary(); private readonly ISendManager _sendManager; internal TraditionalAlertsManager(ISendManager sendManager) { _sendManager = sendManager; } private object _connection; /// /// Create a new connection to the alert server, if required. /// This might do nothing because no connection is required. /// This might do nothing because there is already a connection. /// This might send a "listen" command to get results from that server. /// /// /// If this is true definitely want a connection. /// If this is false, the method will check if there are already pending requests. /// private void CheckListenStatus(bool addInProgress) { lock (_mutex) { if (null != _connection) return; if ((_subscriptions.Count == 0) && !addInProgress) return; _connection = new object(); _sendManager.SendMessage(TalkWithServer.CreateMessage("command", "listen_for_alerts"), Response, true, _connection); foreach (var nvp in _subscriptions) _sendManager.SendMessage( TalkWithServer.CreateMessage("command", "add_alert_category", "category", nvp.Key)); } } /// /// Request data. /// This is thread safe. You can call it from any thread. /// Note: You can request multiple callbacks for a given category. That works well. /// Requesting the same callback/category pair more than once is not well defined. /// Currently we used a single value of type OnResponse and use the + operator on it. /// I believe that allows duplicates. That might chane to a HashSet of OnResponse /// objects. That would ignore duplicates. /// /// What to subscribe to. /// /// Where to send the responses. This might be called from any thread. /// This should not throw an exception. This object has some protection against that /// but the callback function can do a better job. /// public void Subscribe(string category, OnResponse callback) { lock (_mutex) { OnResponse callbackList; CheckListenStatus(addInProgress: true); if (_subscriptions.TryGetValue(category, out callbackList)) // The server is already sending us data. Add one more internal listener. _subscriptions[category] = callbackList + callback; else { // This is the first request for this category. Forward the request to the server. _subscriptions[category] = callback; if (null != _connection) _sendManager.SendMessage( TalkWithServer.CreateMessage("command", "add_alert_category", "category", category)); } } } /// /// Remove a subscription. /// This is thread safe and can be called from any thread. /// As is typical with a thread safe callback, this does not gaurentee that the callbacks will /// stop immediately. However, you should still call this to make sure that resources are /// (eventually) disposed of. /// If there is no such subscription, that's acceptable. No error is generated. /// /// The same as in the call to . /// The same as in the call to . public void Unsubscribe(string category, OnResponse callback) { lock (_mutex) { OnResponse callbackList; if (_subscriptions.TryGetValue(category, out callbackList)) { callbackList -= callback; if (null == callbackList) { // Tell the server to stop sending us this data. _subscriptions.Remove(category); if (null != _connection) _sendManager.SendMessage( TalkWithServer.CreateMessage("command", "delete_alert_category", "category", category)); } else // There are other internal listeners. _subscriptions[category] = callbackList; } } } private static readonly string[] FIELD_SEPERATOR = new string[] { "\r\n" }; private void Response(byte[] body, object serverConnection) { if (_connection != serverConnection) // Old message. return; if (null == body) { // The connection was broken. lock (_mutex) { if (_connection != serverConnection) // Old message. return; _connection = null; CheckListenStatus(addInProgress: false); } } else { string wholeMessage = TalkWithServer.BytesToStr(body); string[] pieces = wholeMessage.Split(FIELD_SEPERATOR, 2, StringSplitOptions.None); string category = pieces[0]; string message = ""; if (pieces.Length > 1) message = pieces[1]; SendMessage(category, message); } } /// /// Send a message to anyone who has subscribed to that message. /// This is public for the sake of sending debug data to the client. /// Normally this is called by this class when data comes from the server. /// This can be called from any thread. /// /// /// public void SendMessage(string category, string message) { // It's tempting to make our own HashSet of OnResponse items. That would give us // more control. We could, for example, wrap each callback in it's own exception // handler. Now, if own listener fails, we skip the rest. Of course, that means // we'd need to copy the hash set each time. OnResponse is currently a read- // only object, so no copies are required. OnResponse callbackList; lock (_mutex) { _subscriptions.TryGetValue(category, out callbackList); } if (null != callbackList) try { callbackList(category, message); } catch { } } private static readonly char[] EQUALS = new char[] { '=' }; /// /// The messages can have any formation. But it is very common to encode an alert like a url. /// This function works like http://php.net/manual/en/function.parse-str.php /// /// /// An encoded string from the server. /// Something like "symbol=DELL&price=23.45&description=New+high+price". /// /// The decoded version of url string. public static RowData ParseOneRow(string raw) { //Dictionary result = new Dictionary(); RowData result = new RowData(); string [] pairs = raw.Split('&'); foreach (string pair in pairs) { string [] pieces = pair.Split(EQUALS, 2); if (pieces.Length == 2) result.Data[Uri.UnescapeDataString(pieces[0])] = Uri.UnescapeDataString(pieces[1]); } return result; } } }