using System; using System.Collections.Generic; using System.Diagnostics; using System.Xml; using TradeIdeas.MiscSupport; using TradeIdeas.ServerConnection; using TradeIdeas.TIProData.Interfaces; using TradeIdeas.XML; /* This unit is responsible for managing the streaming alerts. This is * different from the delphi version because we broke up the history and * oddsmaker requests into different classes from this one. Also, each * streaming alerts object can only be configured once. If you want to * receive different alerts, throw away this object and request another. */ // TODO -- Should we be updating the server time / delay here? // TODO -- On a hard reset we should reset the alert id. namespace TradeIdeas.TIProData { // The alerts will come in cronological order. The simplest thing to do is to foreach // over the data and add the alerts one at a time. The alerts are grouped into alerts // somewhat aribitrarily. This strategy will let you see the same thing no matter // how the alerts are grouped. Note: The alerts are delivered by the server in the // opposite order. public delegate void StreamingAlertsData(List data, StreamingAlerts source); public delegate void StreamingAlertsConfig(StreamingAlerts source); /// /// Usually you'll look these fields up in the metadata. Some are used a lot, not /// just when the user requests them. It was nice to have a simple way to access /// these. And these are exactly the types of constants that I'm afraid of, so at /// least we can mitigate that by putting them all in the same place and not /// copying them all over the code. /// /// Note that the exact field name can depend on different things. Symbol, in /// particular, is capitalized differently for alerts and top lists. Also, some /// of these fields are different in history vs live. That's unfortunate, but /// I upgraded some of the live server stuff and not the historical version, yet. /// public static class CommonAlertFields { /// /// A row data object is really just a wrapper around a dictionary. The keys are /// mostly strings. These keys are copied directly from the element names in the /// XML. If the base name is "Price" you might see "MinPrice" and "Max Price" /// in a collaborate string. And you'll see "c_Price" as the wire name. /// /// We add "c_" for a couple of reasons. 1) Some of the internal names are not /// valid XML names. In particular, some start with a digit instead of a letter. /// Originally we always said "Max" or "Min" first, so this wasn't a problem. /// That worked for specifying a filter, but not for specifying a column value. /// 2) This effectively creates a special namespace for all the standard filters. /// That means that we can include special fields like "ALT_DESCRIPTION" without /// a conflict. /// /// Note that the keys do not have to be strings. String keys are reserved for /// things that we read from the XML, usually from the server. You can use any /// object as a key, so if you're adding your own keys you won't have a conflict. /// /// Originally these wire names were not constants. It was something like "c_0" /// for the first column, "c_1" for the second, etc. You were only supposed to /// read these keys from the metadata, and not make them any other way. There /// were issues. Like an automatic reconnect. Also consider the MultiStrategy /// window. It doesn't clear each time you add or remove columns. And it gets /// multiple meta data messages. Eventually we realized that the server had to /// assign these names in a predictable and repeatable way. Eventually the /// client started to save the names of some well known fields, like the listed /// exchange. Finally I added this fuction so you could find any field without /// looking at the meta data. /// /// Note that the TopListRequest object makes the meta data optional in some /// cases, and not available at all in other cases. CommonAlertFields started /// as a convenience. But now CommonAlertFields is required in these cases /// where you have no meta data. /// /// Something like "Price" or "Vol" or "Vol3M" /// A key for use with a RowData object. public static string GetFilterWireName(string internalCode) { if (null == internalCode) // We're not specific what happens this case. This seems simple enough. return null; return "c_" + internalCode; } /// /// The alert type, like new high or running down. This is the standard internal /// code used in the config window and a number of other places. We generally /// don't advertise this to the end user, but it's not a secret either. The /// anchor for a help URL, for example, will contain this code. /// /// The old code would always send "TYPE". It would also send "c_D_Type" if the /// user asked for the type column. It was hard to avoid that duplication becase /// the server had to be compatible with so many different clients. The 4.x /// client is now talking with the new alert micro server, so we're getting a /// fresh start. The new plan is to always send this one field whether the user /// asked for it or not. /// private static readonly object ALERT_TYPE = GetFilterWireName("D_Type"); public static string GetAlertType(this RowData row, String ifNotFound = "") { string result = row.GetAsString(ALERT_TYPE, null); // Preferred. if (null != result) return result; result = row.GetAsString("TYPE", null); // Some older code. if (null != result) return result; return ifNotFound; } /// /// The price column isn't always included in the rowdata, but this is used so much /// in the robot that it seems like a good idea to create this for simplicity. /// public static readonly object PRICE = GetFilterWireName("Price"); public static double GetPrice(this RowData row, double ifNotFound = 0.0) { return row.GetAsDouble(PRICE, ifNotFound); } /// /// An English text description of what's happening. /// Originally we displayed this for the user. Now we format a messsage ourselves /// using the alert type, the info in ALT_DESCRIPTION, and possibly other data. /// This is still used as a fallback if the new logic fails. /// /// The old code would always use "DESCRIPTION". /// public static readonly string DESCRIPTION = GetFilterWireName("D_Desc"); // A list of fields present in the description. Description is complete and ready // to send to the user. This is made for a computer to parse. public static readonly string ALT_DESCRIPTION = "ALT_DESCRIPTION"; /// /// The internal code for the symbol column. /// public static readonly string SYMBOL_INTERNAL = "D_Symbol"; /// /// This is the preferred way to ask for the symbol. The symbol can actually come /// to us with different keys. So you should always use GetSymbol(), not this /// constant, to read the symbol from a RowData object. /// public static readonly string SYMBOL = GetFilterWireName(SYMBOL_INTERNAL); public static string GetSymbol(this RowData row, String ifNotFound = "") { string result = row.GetAsString(SYMBOL, null); // Preferred. if (null != result) return result; result = row.GetAsString("SYMBOL", null); // Older alerts code. if (null != result) return result; result = row.GetAsString("symbol", null); // Some top list code. if (null != result) return result; result = row.GetAsString("Symbol", null); // Some AI code. if (null != result) return result; return ifNotFound; } public static readonly string EXCHANGE = GetFilterWireName("D_Exch"); public static string GetExchange(this RowData row, String ifNotFound = "") { string result = row.GetAsString(EXCHANGE, null); // Preferred. if (null != result) return result; result = row.GetAsString("EXCHANGE", null); // Some older code. if (null != result) return result; return ifNotFound; } public static readonly string COMPANY_NAME = GetFilterWireName("D_Name"); private static List companyNameFilterList = new List() { " - American Depositary Shares", " - American Depository Shares", " - Common shares", " - Class " }; public static string GetCompanyName(this RowData row, String ifNotFound = "") { // Shorten company name containing a string form companyNameFilterList. string filteredCompanyName = row.GetAsString(COMPANY_NAME, ifNotFound); if (!String.IsNullOrEmpty(filteredCompanyName)) { foreach (string phrase in companyNameFilterList) { int index = filteredCompanyName.IndexOf(phrase); if (index > 0) { filteredCompanyName = filteredCompanyName.Substring(0, index); break; } } } return filteredCompanyName; } private static readonly string TIME = GetFilterWireName("D_Time"); public static DateTime? GetTime(this RowData row) { string asString = row.GetAsString(TIME, null); // Preferred. if (null == asString) asString = row.GetAsString("TIME", null); return ServerFormats.DecodeServerTime(asString); } public static DateTime GetTime(this RowData row, DateTime ifNotFound) { DateTime? time = row.GetTime(); if (time.HasValue) return time.Value; else return ifNotFound; } /// /// Read the time field, convert it into a time, then convert that into time_t format. /// /// /// 0 for error, the time in time_t format for anything else public static long GetTimeT(this RowData row) { return ServerFormats.ToTimeT(row.GetTime()); } private static readonly string QUALITY = GetFilterWireName("D_Quality"); public static String GetQualityString(this RowData row, string ifNotFound = "") { string result = row.GetAsString(QUALITY, null); // Preferred. if (null != result) return result; result = row.GetAsString("QUALITY", null); if (null != result) return result; return ifNotFound; } /// /// Ideally this would be private. But I don't want to rewrite and test large /// portions of DescriptionFormatter.cs. /// /// /// public static double? GetQuality(this RowData row) { string asString = GetQualityString(row); double result; if (ServerFormats.TryParse(asString, out result)) return result; else return null; } public static double GetQuality(this RowData row, double ifNotFound) { double? result = row.GetQuality(); if (result.HasValue) return result.Value; else return ifNotFound; } // Blank or 0 means live data. 1 means short history. We don't like to show // an empty window, so if there are no alerts we will try to go back and find // one historical record. 2 means long history. That's when the user // specifically asked for history. public static readonly string HISTORICAL = "HISTORICAL"; } public interface StreamingAlerts { event StreamingAlertsData StreamingAlertsData; event StreamingAlertsConfig StreamingAlertsConfig; // Call start to start receiving the data. Nothing will be sent to the server until // you do this. This gives you time to register for the callbacks. Also, this gives // you time to store a copy of this object so when you get a response you are sure // it is from the current object. void Start(); // Always call stop when you are done. Otherwise the server will keep sending alerts // and the client will have to parse them. This object will not be garbage collected // before you call stop, so just ignoring this object is not enough. void Stop(); // Status. This is the response from the server to our config. This will get more // interesting when we can configure the columns. // Config is initially whatever the client specified, but the server might overwrite // it with something slightly different. The server's version is the connonical // version and should be used whenever possible. string Config { get; } // WindowName is null until we hear from the server. Past experience suggests that // it's best to leave the window name blank before this is filled in. string WindowName { get; } // This might be null before we hear from the server. IList Columns { get; } } public class StreamingAlertsManager { private class StreamingAlertsImpl : StreamingAlerts { private static Int64 _lastId = 0; private readonly string _id; private readonly StreamingAlertsManager _manager; public string Config { get; private set; } public string WindowName { get; private set; } public IList Columns { get; private set; } public event StreamingAlertsData StreamingAlertsData; public event StreamingAlertsConfig StreamingAlertsConfig; private readonly bool _saveToMru; public StreamingAlertsImpl(string config, bool saveToMru, StreamingAlertsManager manager) { _saveToMru = saveToMru; _id = "SA" + System.Threading.Interlocked.Increment(ref _lastId); Config = config; _manager = manager; } public void SendConfig() { _manager._sendManager.SendMessage( TalkWithServer.CreateMessage( "command", "ms_alert_start", "strategy_id", _id, "long_form", Config, /* "custom_columns", 1, // The server needs this to know that we are a newer client. // Older clients get the built in columns. "non_filter_columns", 1, */ // skip_history is not used by the alert micro services server. We decided that the client // should make a separate request for history if it wants history. That makes things // much simpler on the server side. //"skip_history", _dataPresent ? "1" : "0", "save_to_mru", _saveToMru ? "1" : "0"), Response); //System.Diagnostics.Debug.Print(DateTime.Now + ": ms_alert_start strategy_id=" + _id); } private void Response(byte[] body, object unused) { XmlNode wholeFile = XmlHelper.Get(body); if (null == wholeFile) // Assume the only error is that the server connection was broken. // That will automatically be fixed elsewhere. return; // The Delphi code would look for 'TI-ErrorMsg' here. I don't see that // in the server, and I don't think it's existed in the server for a // long time. XmlNode status = wholeFile.Node(0).Node("STATUS"); string clientCookie = status.Node("CLIENT_COOKIE").Property("VALUE"); // TODO -- Use this! string shortForm = status.Property("SHORT_FORM"); string windowName = status.Property("WINDOW_NAME"); List columns = new List(); foreach (XmlNode columnXml in status.Node("COLUMNS").Enum()) columns.Add(new ColumnInfo(columnXml, "ALERT_WINDOW")); if ((shortForm != Config) || (windowName != WindowName) || (!ColumnInfo.Equal(columns, Columns))) { Config = shortForm; WindowName = windowName; Columns = columns.AsReadOnly(); StreamingAlertsConfig callback = StreamingAlertsConfig; if (null != callback) try { callback(this); } catch { // Keep working even if the callback fails. } } } private static RowData DecodeAlert(XmlNode asXml) { RowData result = new RowData(); foreach (XmlAttribute attribute in asXml.Attributes) result.Data[attribute.LocalName] = attribute.Value; return result; } private static List DecodeAlerts(XmlNode asXml) { List result = new List(); foreach (XmlNode alertNode in asXml.Enum()) result.Add(DecodeAlert(alertNode)); //result.Reverse(); We used to reverse it. Now we get them in the order // we need them in. The first item in the list is the oldest. //if (result.Count > 0) // System.Diagnostics.Debug.WriteLine(result.Count + " alerts. " + DateTime.Now); return result; } public void SendAlerts(XmlNode asXml) { StreamingAlertsData callback = StreamingAlertsData; if (null != callback) try { List asList = DecodeAlerts(asXml); callback(asList, this); } catch { // Keep working even if the callback fails. } } private bool _started = false; public void Start() { lock (_manager._mutex) { _manager.CheckListenStatus(addInProgress: true); Debug.Assert(!_started, "Only call StreamingAlerts.Start() once per object."); _started = true; _manager._windows.Add(_id, this); } SendConfig(); } public void Stop() { // Stop the data from the server. _manager.CancelById(_id); // The error message is not quite right, but this is close enough. It is not // right to call Stop() followed by Start(). It is acceptable to call Stop() // multiple times, and it call it without calling Start() first. _started = false; } } public StreamingAlerts GetAlerts(string config, bool saveToMru = false) { return new StreamingAlertsImpl(config, saveToMru, this); } private readonly object _mutex = new Object(); private readonly Dictionary _windows = new Dictionary(); private readonly ISendManager _sendManager; private Stopwatch _timeoutStopwatch = new Stopwatch(); internal StreamingAlertsManager(ConnectionLifeCycle connectionLifeCycle, ISendManager sendManager) { connectionLifeCycle.OnTimer +=new ConnectionLifeCycle.Callback(OnTimer); _sendManager = sendManager; } private void OnTimer(ConnectionLifeCycle sender) { if (sender.Status != ConnectionLifeCycle.ConnectionStatus.Full) return; if (_timeoutStopwatch.Elapsed.TotalSeconds > 15.0) { // This is almost redundant. The ping timer is also able to restart the connection // if we don't see any activity. This is better because it can look for a number // of problems. If we are connected to a server, but the data is very slow, or if // there is an internal error in the client, we will not get data and this timer // will reconnect us, hopefully fixing the problem. if (_windows.Count > 0) { sender.SoftReset(); } _timeoutStopwatch.Restart(); } } private object _connection; /// /// Create a new connection to the alert server, if required. /// This might do nothing because no connection is required. /// This might do nothing because there is already a connection. /// This might send a "listen" command to get results from that server. /// /// /// If this is true definitely want a connection. /// If this is false, the method will check if there are already pending requests. /// private void CheckListenStatus(bool addInProgress) { object connection; lock (_mutex) { if (null != _connection) return; if ((_windows.Count == 0) && !addInProgress) return; connection = new object(); _connection = connection; _sendManager.SendMessage(TalkWithServer.CreateMessage("command", "ms_alert_listen", "next_id", _nextAlertId), Response, true, connection); _timeoutStopwatch.Restart(); //System.Diagnostics.Debug.Print(DateTime.Now + ": ms_alert_listen next_id=" + _nextAlertId); foreach (KeyValuePair kvp in _windows) // Send the initial config while in the lock. Someone could be trying // to cancel a request right now, and we don't want those two things to // get out of order. kvp.Value.SendConfig(); } } private int _deletedWindowDataCount = 0; private string _nextAlertId; private void Response(byte[] body, object connection) { if (_connection != connection) // Old message. return; if (null == body) lock (_mutex) { if (_connection != connection) // Old message. return; _connection = null; CheckListenStatus(addInProgress: false); return; } XmlNode wholeFile = XmlHelper.Get(body); if (null == wholeFile) return; _timeoutStopwatch.Restart(); foreach (XmlNode windowData in wholeFile.Node(0).Node("DATA").Enum()) { StreamingAlertsImpl window = null; string id = windowData.Property("ID"); lock (_mutex) { if (_windows.ContainsKey(id)) window = _windows[id]; } if (null == window) // This shouldn't happen very often. But sometimes it's unavoidable. // We could have sent a cancel message and the server was sending us // data, and the two messages crossed. It's tempting to send another // cancel message here, just to be safe. But that should not be // necessary. It's more tempting just to have a breakpoint here. _deletedWindowDataCount++; else // parse the data and sent it out. window.SendAlerts(windowData); // TODO: Look at this closer. Could we possibly send two of these messages // on the same connection? } string nextAlertId = wholeFile.Node(0).Node("RESTART").Property("NEXT_ID", null); if (null != nextAlertId) _nextAlertId = nextAlertId; } private void CancelById(string id) { lock (_mutex) { _windows.Remove(id); _sendManager.SendMessage( TalkWithServer.CreateMessage( "command", "ms_alert_stop", "strategy_id", id)); //System.Diagnostics.Debug.Print(DateTime.Now + ": ms_alert_stop strategy_id=" + id); } } } }