using System; using System.Collections.Generic; using System.Text; using System.Xml; using System.Diagnostics; using System.Collections; using TradeIdeas.ServerConnection; using TradeIdeas.XML; using TradeIdeas.TIProData.Interfaces; namespace TradeIdeas.TIProData { public class ListManager { private enum OldDisposition { Keep, /* Add or delete individual symbols, as described below, but * don't do anything else. */ DeleteSymbols, /* Delete all symbols from the list before adding the * specific ones listed below. */ DeleteList /* Delete the symbols from the list and the list itself. * It will not appear in the GUI any more. */ } private class ListDelta { private HashSet _toAdd = new HashSet(); // Add and delete these specific symbols. private HashSet _toDelete = new HashSet(); private OldDisposition _oldDisposition; // Do this action first. private bool _listRequested; // Request the contents of the list from the server. public void AddSymbol(string symbol) { _toAdd.Add(symbol); _toDelete.Remove(symbol); if (_oldDisposition == OldDisposition.DeleteList) _oldDisposition = OldDisposition.DeleteSymbols; } public void DeleteSymbol(string symbol) { _toAdd.Remove(symbol); if (_oldDisposition == OldDisposition.Keep) _toDelete.Add(symbol); else _oldDisposition = OldDisposition.DeleteSymbols; } public void SetAllSymbols(HashSet symbols) { _oldDisposition = OldDisposition.DeleteSymbols; _toDelete.Clear(); // It might work okay if we just pointed to symbols, but we'd need to document // it accordingly. _toAdd = new HashSet(symbols); } public void DeleteList() { _oldDisposition = OldDisposition.DeleteList; _toDelete.Clear(); _toAdd.Clear(); } public void RequestFromServer() { _listRequested = true; } public OldDisposition OldDisposition { get { return _oldDisposition; } } public bool ListRequested { get { return _listRequested; } } public HashSet ToAdd { get { return _toAdd; } } public HashSet ToDelete { get { return _toDelete; } } /* If a request fails, we don't necessarily send it right back. First * we check to see if any new changes have been requested. Then we * merge the new requests with the old ones. This is all transparent * to the client. The server will see one big message instead of two * small ones. */ public static ListDelta Join(ListDelta moreRecent, ListDelta lessRecent) { // The output is the combination of the two objects. Note that the objects // are likely to be modified and / or reused. if (moreRecent == null) return lessRecent; else if (lessRecent == null) { return moreRecent; } else if (moreRecent._oldDisposition != OldDisposition.Keep) { if (lessRecent._listRequested) moreRecent._listRequested = true; return moreRecent; } else { if (moreRecent._listRequested) lessRecent._listRequested = true; foreach (string symbol in moreRecent._toAdd) lessRecent.AddSymbol(symbol); foreach (string symbol in moreRecent._toDelete) lessRecent.DeleteSymbol(symbol); return lessRecent; } } } private class ListInternals : List { // This is the bulk of how we deal with a single list. This class allows // List and ListManager both to share a lot of data. private readonly int _id; private readonly ListManager _listManager; private readonly Object _mutex; private string _name; private OnListReturned _onOnListReturned; // _sentRequests contains anything that we've sent off, but has not yet been // confirmed. We save a copy in case we need to send it again. _heldRequests // are requests that we have not sent yet. This typically happens when there // are already outstanding requests but the main program asks for more. private ListDelta _heldRequests, _sentRequests; public void SendRequestIfReady() { lock (_mutex) { if ((_heldRequests != null) && (_sentRequests == null)) // This item is ready. _listManager.NeedsAttention(this); _listManager.CheckForWork(); } } public void SendRequestNow() { lock (_mutex) { Debug.Assert((_heldRequests != null) && (_sentRequests == null), "List is not ready to be sent."); ArrayList message = new ArrayList(); message.Add("id"); message.Add(_id); message.Add("name"); message.Add(_name); if (_heldRequests.OldDisposition == OldDisposition.DeleteList) { message.Add("delete_list"); message.Add("1"); } else { if (_heldRequests.OldDisposition == OldDisposition.DeleteSymbols) { message.Add("delete_all"); message.Add("1"); } if (_heldRequests.ToAdd.Count > 0) { message.Add("add"); string joinedSymbols = Join(_heldRequests.ToAdd); message.Add(joinedSymbols); } if (_heldRequests.ToDelete.Count > 0) { message.Add("delete"); message.Add(Join(_heldRequests.ToDelete)); } if (_heldRequests.ListRequested) { message.Add("request_list"); message.Add("1"); } } message.Add("command"); message.Add("update_list"); _sentRequests = _heldRequests; _heldRequests = null; _listManager._serverConnection.SendMessage(TalkWithServer.CreateMessageFromArray(message.ToArray()), Response); } } static private string JoinAll(HashSet items) { StringBuilder result = new StringBuilder(); foreach (string item in items) { result.Append(item); result.Append('\n'); } return result.ToString(); } /// /// Combine all of the items into a single string. /// Add an \n as a item terminator, as required by the server. /// Limit the list, if necessary, to prevent a message that is too long and would be rejected by the server. /// /// /// static private string Join(HashSet items) { string joinedSymbols = JoinAll(items); // If the symbol list is really large, just truncate it. This was causing a bug where the server // would cause a disconnection and the client would reconnect and continually try to update the list if (joinedSymbols.Length > 200000) joinedSymbols = joinedSymbols.Substring(0, 200000); return joinedSymbols; // This code for limiting the lists seems a bit crude, but it should suffice. // The server would cut this message off around 500,000 bytes. // We limit this field to 200,000 bytes to include some padding. // The following request, made on 8/24/2021, shows that you could make a list // including every symbol that the server knows about, and it would only require 166,589 bytes. // // mysql> select count(*),sum(length(d_symbol)+1),list_exch from alerts_daily where date=curdate() group by list_exch with rollup; // +----------+-------------------------+-----------+ // | count(*) | sum(length(d_symbol)+1) | list_exch | // +----------+-------------------------+-----------+ // | 2643 | 15056 | NULL | // | 3 | 16 | $NDX | // | 322 | 1511 | AMEX | // | 1709 | 7977 | ARCA | // | 516 | 2531 | BATS | // | 2354 | 22271 | CAT | // | 1346 | 11274 | CAV | // | 5115 | 26186 | NASD | // | 3734 | 17777 | NYSE | // | 637 | 3454 | OQB | // | 271 | 1547 | OQX | // | 9874 | 55575 | PINK | // | 106 | 1396 | SMAL | // | 3 | 18 | TEST | // | 28633 | 166589 | NULL | // +----------+-------------------------+-----------+ // 15 rows in set(0.04 sec) // I also considered a way to go past this limit. This function could delete symbols // from the items variable as they were moved to the string. If we did not have // room for all the items, the caller would know. The caller could use these remaining // items to set up the next request to send to the server. // // If the GUI requested to add 300,000 bytes worth of symbols, this library would send // the first batch, which would contain about 200,000 bytes worth of symbols, immediately. // This library would save the remaining 100,000 bytes worth of symbols for the next // request. // // It might get a little tricky. Could some requests already be queued up after this // one? (I don't think so, but need to check.) If so, make sure that the requests // are merged correctly. Could someone add a single symbol that was over 200,000 bytes? // Testing would be very tricky. The current approach should be sufficient and will be // much easier to test than what I'm proposing here. // The relevant code on the server side is the maxBufferSize const in // cpp_alert_server/source/shared/CommandDispatcher.C. The server has to have a limit // to keep from running out of memory. The actual limit is arbitrary. } private void Response(byte[] body, object unused) { OnListReturned sendCurrentResultsTo = null; HashSet copyOfList = null; lock (_mutex) { XmlDocument message = XmlHelper.Get(body); if (message == null) { _listManager._recentError = true; _heldRequests = ListDelta.Join(_heldRequests, _sentRequests); _sentRequests = null; } else { if (_onOnListReturned != null) { sendCurrentResultsTo = _onOnListReturned; _onOnListReturned = null; copyOfList = new HashSet(); XmlNode listFromServer = message.Node(0).Node("SYMBOLS"); if (listFromServer != null) { listFromServer = listFromServer.FirstChild; while (listFromServer != null) { string symbol = listFromServer.Property("NAME").Trim().ToUpper(); if (symbol != "") copyOfList.Add(symbol); listFromServer = listFromServer.NextSibling; } } // It is not an error if that node does not exist. That just means the list is empty. // TODO notify listeners. } _sentRequests = null; } } SendRequestIfReady(); if (sendCurrentResultsTo != null) sendCurrentResultsTo(this, copyOfList); } public void DeleteAll() { // When someone calls delete all, we send a single specific message // which deletes everything. The problem comes when individual requests // exist at the same time as the delete all request. For simplicity we // add a delete list message to any active symbol list. This might be // extra work sometimes, but it works. lock (_mutex) { if ((_heldRequests != null) || (_sentRequests != null)) { if (_heldRequests == null) _heldRequests = new ListDelta(); _heldRequests.DeleteList(); } } } public ListInternals(int id, ListManager listManager, Object mutex) { //Debug.Assert(id > 0, "A list id must be a positive number"); _id = id; _listManager = listManager; _mutex = mutex; } public override string ToString() { // This is definitely aimed at debugging. End users never see the list number. return "List #" + Id + ": '" + Name + "'"; } // This section is required to implement the List interface. public void AddSymbol(string symbol) { lock (_mutex) { if (_heldRequests == null) _heldRequests = new ListDelta(); _heldRequests.AddSymbol(symbol); } SendRequestIfReady(); } public void DeleteSymbol(string symbol) { lock (_mutex) { if (_heldRequests == null) _heldRequests = new ListDelta(); _heldRequests.DeleteSymbol(symbol); } SendRequestIfReady(); } public void SetAllSymbols(HashSet symbols) { lock (_mutex) { if (_heldRequests == null) _heldRequests = new ListDelta(); _heldRequests.SetAllSymbols(symbols); } SendRequestIfReady(); } public void DeleteList() { lock (_mutex) { if (_heldRequests == null) _heldRequests = new ListDelta(); _heldRequests.DeleteList(); } SendRequestIfReady(); } public void RequestFromServer(OnListReturned onOnListReturned) { lock (_mutex) { if (_onOnListReturned != null) { // There is already an outstanding request. _onOnListReturned += onOnListReturned; } else { // Send a new request. _onOnListReturned = onOnListReturned; if (_heldRequests == null) _heldRequests = new ListDelta(); _heldRequests.RequestFromServer(); } } SendRequestIfReady(); } public int Id { get { return _id; } } public string Name { get { return _name; } set { _name = value; } } } public delegate void OnListReturned(List list, HashSet Symbols); public interface List { // This is responsible for a single symbol list. This is available to the // main program. void AddSymbol(string symbol); void DeleteSymbol(string symbol); void SetAllSymbols(HashSet symbols); void DeleteList(); int Id { get; } string Name { get; set; } // Pretend that the symbol list is read only. C# does not provide a read only // HashSet and I didn't want to write my own. If there are multiple requests // from the main program all at the same time, we will only send one request // to the server, and we will send the same list object to each listener. This // list will never be null but it might be read only. void RequestFromServer(OnListReturned onOnListReturned); } private Object _mutex = new Object(); private Dictionary _lists = new Dictionary(); public List Find(int id) { lock (_mutex) { if (_lists.ContainsKey(id)) return _lists[id]; ListInternals result = new ListInternals(id, this, _mutex); _lists[id] = result; return result; } } private HashSet _needsAttention = new HashSet(); private void NeedsAttention(ListInternals listInternals) { lock (_mutex) { _needsAttention.Add(listInternals); } CheckForWork(); } private bool _needToSendDeleteAll = false; private bool _sentDeleteAll = false; private void SendDeleteAllNow() { _needToSendDeleteAll = false; _serverConnection.SendMessage(TalkWithServer.CreateMessage("command", "delete_all_lists"), DeleteAllResponse); } public void DeleteAll() { lock (_mutex) { foreach (KeyValuePair pair in _lists) pair.Value.DeleteAll(); _needToSendDeleteAll = true; } CheckForWork(); } private void DeleteAllResponse(byte[] body, object unused) { lock (_mutex) { _sentDeleteAll = false; XmlDocument message = XmlHelper.Get(body); if (message == null) { _recentError = true; _needToSendDeleteAll = true; } } CheckForWork(); } public delegate void ListOfListsCallback(List lists); public void GetListOfLists(ListOfListsCallback callback) { new ListOfListsHelper(callback, this); } private class ListOfListsHelper { private readonly ListOfListsCallback _callback; private readonly ListManager _listManager; public ListOfListsHelper(ListOfListsCallback callback, ListManager listManager) { _callback = callback; _listManager = listManager; SendRequest(); } private void Response(byte[] body, object unused) { bool success = false; try { XmlDocument message = XmlHelper.Get(body); if (message != null) { // If message is null there was a problem. The XML didn't parse or there was // a communication problem. List result = new List(); XmlNode source = message.Node(0).Node("LISTS"); foreach (XmlNode currentNode in source.Enum()) { List current = _listManager.Find(currentNode.Property("ID", -1)); current.Name = currentNode.Property("NAME"); result.Add(current); } success = true; // We don't really care if this fails. We did our part by delivering the message. _callback(result); } } catch { } if (!success) SendRequest(); } static readonly Dictionary REQUEST = TalkWithServer.CreateMessage("command", "get_list_of_lists", "allow_negative_list_ids", true); private void SendRequest() { _listManager._sendManager.SendMessage(REQUEST, Response); } } private bool _recentError; private bool _checkForWorkInProgress; private void CheckForWork() { lock (_mutex) { if (_checkForWorkInProgress) // When you send a request to the server, the response usually comes back in another thread. // However, your delegate might get called immediately, in the same thread, before the function // sending the request returns to you. (This behavior is not specific to my code. I inherit // this from the way asynchronous calls work for sockets.) This typically happens when there // is an error, such as a broken socket. As a result, this function might be called recursively. return; _checkForWorkInProgress = true; CheckForWorkImpl(); _checkForWorkInProgress = false; } } private void CheckForWorkImpl() { if (_recentError) // Wait for the timer to tell us to retry. return; if (_sentDeleteAll) // The delete all request covers every list, so we don't do anything else until this is confirmed. return; if (_needToSendDeleteAll) { // For simplicity, don't do any more work until we get a confirmation. It's tempting to try to // optimize this, but it shouldn't happen very much. If we send all the requests to the same // server connection, we know the order will be preserved. But that's a little complicated. SendDeleteAllNow(); return; } HashSet needsAttention = null; lock(_mutex) { if (_needsAttention.Count > 0) { // Make a copy of the list and work on that copy. Immediately clear out _needsAttention. If // there is a problem with the connection, sendRequestNow might end up adding something to // _needsAttention. And it might happen in this thread so the locks won't help us. In that // case we will not immediately handle the new request. We will probably do it the next time // the timer goes off. And that's the right thing to do. Otherwise when a connection goes // down we might get into an infinite loop. needsAttention = _needsAttention; _needsAttention = new HashSet(); } } if (null != needsAttention) { foreach (ListInternals listInternals in needsAttention) { listInternals.SendRequestNow(); } } } private void OnTimer(ConnectionLifeCycle sender) { _recentError = false; CheckForWork(); } private ConnectionLifeCycle _connectionLifeCycle; private ISendManager _sendManager; internal ListManager(ConnectionLifeCycle connectionLifeCycle, ISendManager sendManager) { // Wherever possible we use the SendManager. It is the simpler interface and the // preferred interface. However, some of our messages are more complicated and // have to be sent in a specific order. For those we manually take care of pausing // and retrying. _connectionLifeCycle = connectionLifeCycle; _sendManager = sendManager; _connectionLifeCycle.OnTimer += new ConnectionLifeCycle.Callback(OnTimer); _connectionLifeCycle.OnConnection += new ConnectionLifeCycle.ConnectionCallback(_connectionLifeCycle_OnConnection); } /// /// Use this when the SendManager isn't good enough. /// This will initially be null, but it will never change from not null back to null. /// private TalkWithServer _serverConnection; void _connectionLifeCycle_OnConnection(ConnectionLifeCycle source, TalkWithServer serverConnection) { _serverConnection = serverConnection; } } }