using System; using System.Collections.Generic; using System.Linq; using System.Text; using TradeIdeas.MiscSupport; using System.Threading; using System.Web; using System.Security.Cryptography; using TradeIdeas.TIProData.Interfaces; namespace TradeIdeas.TIProData { /// /// Use this to request streaming data about one specific symbol. Request any fields /// you want, including standard fields or custom formulas. /// /// Despite the size of this file, the interface is very small. /// o StockDataManager.Find() -- Finds the appropriate object to start the process. /// o StockDataManager.Subscribe() -- Request streaming data. Returns a Token. /// o StockDataManager.Token.Cancel() -- Stop the request and clean up. /// StockDataManager.Token contains other data, too, including a look at the /// original request. /// o StockDataManager.MakeRowDataKey() -- Required to look up the result in the /// callback. /// /// This is responsible for merging similar requests. If two people both ask for the /// price of MSFT, we only send one request to the server but we share the results /// with both listeners. If one person asks for the price and volume of MSFT, and /// someone else asks for the price and 5 minute range of MSFT, they will both share /// a single request which is the union of both requests. /// /// When you cancel a request, that might cancel the request to the server. Or it /// might replace the current request with a simpler one. When you ask for new data, /// or cancel data, we might try to look for other requests and group them all together. /// /// If you want data that doesn't change often, like a the company name or sector for /// a stock symbol, look at SymbolDetailsCacheManager.cs. If you want data for a lot /// of stocks look at TopListRequest. If you want similar data but for a historical /// time, look at TopListManager. /// /// This was inspired by QuoteFeedManager. This will hopefully replace QuoteFeedManager. /// /// From the outside this looks like most classes in TIPro data. You can make a new /// request or cancel an old request from any thread. You will get callbacks and we /// make no promises about what thread you will be in. /// /// On the inside this looks like our C++ server code. And it looks like a lot of C# /// GUI code. We automatically create one thread just for us. All external operations /// (e.g. a user requests data from us, or the server gives data to us) are send to /// this thread. Only this thread can access most variables and methods. The only /// lock, and the only data shared with other threads, is a queue that sends messages /// to this thread. /// public class StockDataManager { /// /// Given a custom formula, come up with an apporpriate name. Use this name when /// looking for data in the RowData object that we provide with each callback. /// /// If two people both request the same custom formula, even without knowing it, /// their requests should be merged. /// /// Some names are somewhat standard and used in a lot of places in the code. /// Library routines, like when you right click on a row and a "send to" menu /// appears, expect to see certain fields with standard names. /// /// This is not as fast as I'd like. If convenient you should store a copy of /// this in a variable. /// /// /// The formula that you want the server to compute. The same value that you /// sent to Subscribe(). /// /// public static string MakeRowDataKey(string formula) { if ((formula.Length > 1) && (formula[0] == '[') && (formula[formula.Length - 1] == ']')) { string middle = formula.Substring(1, formula.Length - 2); bool valid = true; foreach (char ch in middle) if (!((ch == '_') || Char.IsLetterOrDigit(ch))) { valid = false; break; } if (valid) // This looks like a simple filter and nothing else. This type // of request can come from a lot of places and it usually has // the same key. return CommonAlertFields.GetFilterWireName(middle); } // The size is fixed. The result is unreadable. It is possible, but highly // unlikely, that two inputs will return the same key. The alternatives // were less appealing. We used to use a simple encoding where keys can // get very long. You could try to make a short key like "K1", "K2", etc. // To make that work you'd need to keep a complete list of translations, // which could look like a memory leak. To avoid the memory leak you could // make each piece of code that uses a key increment a reference counter, and // decrement it when it was done. That would a serious pain, if it was // possible at all. return MakeHashedRowDataKey(formula); } private static string MakeHashedRowDataKey(string formula) { byte[] resultAsBytes; using (SHA1 hasher = SHA1.Create()) resultAsBytes = hasher.ComputeHash(Encoding.UTF8.GetBytes(formula)); string result = Convert.ToBase64String(resultAsBytes); result = result.Replace('+', '_').Replace('/', '.').Replace("=", ""); return "F_" + result; } [ThreadStatic] static bool _correctThread; private static void DoInThread(Action action) { lock (_eventQueue) { _eventQueue.Enqueue(action); } _eventsAvailable.Set(); } private static void AssertInThread() { System.Diagnostics.Debug.Assert(_correctThread); } /// /// You get a new one of these each time you call Subscribe(). You can use == or != on it /// when you get a response to make sure the response came from the correct call to /// Subscribe(). /// /// This was inspired by TopListRequest.Token. Among other things, this groups together /// a lot of data all related to the request that created it. /// public interface Token { /// /// Cancel the given request. It is safe to call this even after the request /// has already been canceled. /// void Cancel(); /// /// This can be used to distinguish between two requests made by the same /// listener. Normally you can use the Token object for that. However, in a multi- /// threaded environment this could still help. You don't know the identity of /// the Token object until Send() returns. But the result of a send might appear /// in another thread before Send() returns. /// /// This can be used for anything. It might be null. This will be whatever value /// you gave to Send(). /// object ClientInfo { get; } /// /// From the original request. /// String Symbol { get; } /// /// From the original request. /// The details of this request have not been defined. /// Internally we use a set of strings. Ideally you'd get a read-only /// version of this set. C# doesn't make a simple read only wrapper for /// sets. It would be dangerous to share the non-read-only version, but /// it would be slow to make copies that might never be used. For now /// I'm just copying an object that we already have, the array of strings. /// String[] Formulas { get; } } public class Response { public RowData RowData; public Token Token; // Time? Probably not that interesting. For a single symbol request // the server pushes things out as soon as it gets new data. So the time // associated with the response is probably the current time plus the // network latency. Maybe there's delayed data for someone who didn't // pay. Maybe (if we add that option) we're looking at data frozen at // the close. Maybe we're getting an update in the middle of the night // because we asked for it, and the last data is from long ago. } /// /// Pretend like we got this record from the server. Send it to any and all listeners. /// This will be sent to another thread, exactly like data from the server. /// /// This message will go to anyone subscribed to this symbol. /// The field names and values, url encoded. public void DebugSend(string symbol, string encodedValues) { RowData rowData = new RowData(); var parsed = HttpUtility.ParseQueryString(encodedValues); foreach (string key in parsed) rowData.Data[key] = parsed[key]; DebugSend(symbol, rowData); } /// /// Pretend like we got this record from the server. Send it to any and all listeners. /// This will be sent to another thread, exactly like data from the server. /// /// This message will go to anyone subscribed to this symbol. /// This will be sent to all listeners. public void DebugSend(string symbol, RowData rowData) { if (!rowData.Data.ContainsKey(CommonAlertFields.SYMBOL)) rowData.Data[CommonAlertFields.SYMBOL] = symbol; DoInThread(delegate { DebugSendInThread(symbol, rowData); }); } private void DebugSendInThread(string symbol, RowData rowData) { SymbolRequest symbolRequest; _allRequests.TryGetValue(symbol, out symbolRequest); if (null != symbolRequest) symbolRequest.SendRowData(rowData); } private class TokenImpl : Token { public override string ToString() { StringBuilder sb = new StringBuilder(); sb.Append("StockDataManager.TokenImpl(").Append(Symbol); foreach (string formula in _formulas) sb.Append(", ").Append(formula); sb.Append(")"); return sb.ToString(); } public void Cancel() { DoInThread(delegate { SymbolRequest request; if (Owner._allRequests.TryGetValue(Symbol, out request)) { // If we didn't get this far, the owner was already released. // That would mean that all listeners were already canceled. // That would mean that this cancel request is a duplicate and // we should silently ignore it. request.Cancel(this); } }); } private readonly object _clientInfo; public object ClientInfo { get { return _clientInfo; } } private readonly String _symbol; public string Symbol { get { return _symbol; } } private readonly string[] _formulas; public string[] Formulas { get { return _formulas; } } public readonly HashSet FormulaSet; public readonly Action Callback; public readonly StockDataManager Owner; public TokenImpl(Action callback, object clientInfo, String symbol, string[] formulas, HashSet formulaSet, StockDataManager owner) { Callback = callback; _clientInfo = clientInfo; _symbol = symbol; _formulas = formulas; FormulaSet = formulaSet; Owner = owner; } } public Token Subscribe(Action callback, object clientInfo, string symbol, params string[] formulas) { HashSet formulaSet = new HashSet(formulas); TokenImpl result = new TokenImpl(callback, clientInfo, symbol, formulas, formulaSet, this); DoInThread(delegate { SymbolRequest request; _allRequests.TryGetValue(symbol, out request); if (null == request) { request = new SymbolRequest(symbol, this); _allRequests[symbol] = request; } request.Add(result); }); return result; } private readonly ISendManager _sendManager; private StockDataManager(ISendManager sendManager) { _sendManager = sendManager; } static private Dictionary _instances = new Dictionary(); static public StockDataManager Find(ISendManager sendManager) { lock (_instances) { StockDataManager result; if (_instances.TryGetValue(sendManager, out result)) return result; result = new StockDataManager(sendManager); _instances.Add(sendManager, result); return result; } } static private StockDataManager _main; static public StockDataManager Find() { if (null == _main) _main = Find(ConnectionMaster.First.SendManager); return _main; } /// /// We merge all user requests for the same symbol. We should never have more than one of these per symbol. /// The user doesn't directly see this. /// private class SymbolRequest : TopListRequest.Listener { /// /// Start with a FIFO. The first item you Add() will be the first item you Get(). /// /// This queue can only have one copy of each item at a time. If you try to add /// an item which is already in the list, that operation is silently ignored. Note /// that the item keeps its place in the list and is not moved to the back of the /// list. We absolutely keep the original and ignore the new value. /// /// You get this and a lot more from a Java LinkedHashSet. StackOverflow has /// a full implemention of that data structure, but it's huge and barely tested. /// http://stackoverflow.com/questions/9346526/what-is-the-equivalent-of-linkedhashset-java-in-c /// private class UniqueFifo where T : class { private readonly Queue _fifo = new Queue(); private readonly HashSet _unique = new HashSet(); public void Add(T item) { if (_unique.Add(item)) _fifo.Enqueue(item); } /// /// Find the oldest item in the queue. Remove it and return it. /// /// Once you Get() and item it's safe to add the item again. /// /// The oldest item in the queue, or null if the queue is empty. public T Get() { if (_unique.Count == 0) return null; T result = _fifo.Dequeue(); _unique.Remove(result); return result; } } // Sometimes we defer an operation. If someone makes a change, we wait and see if a related change is // coming before doing anything. This is an implementation detail, like caching. private static readonly UniqueFifo _needsAttention = new UniqueFifo(); // This will execute one item and return true. Or if there's no work, it returns false. // This is meant to be low priority. We should be able to stop in the middle of our idle // tasks if new high priority work appears. public static bool OnIdle() { SymbolRequest request = _needsAttention.Get(); if (null == request) return false; request.UpdateYourself(); return true; } private void UpdateYourself() { AssertInThread(); // Assume for simplicity that there was a real change. It's certainly possible // that someone added a new field and then deleted it before we got a chance to // do anything with it. So the new request will be identical to the current // request. We could check for that, but it seems rare and unlikely. if (null != _topListToken) _topListToken.Cancel(); _topListToken = null; if (_allUserRequests.Count == 0) { // The main program isn't listening any more. No one for us to forward // anything to. No useful data for us to save. // // Note: No listeners should imply no data. If the last listener went // away and we think we still need data for the Price field, that's a // serious error. And it's an error that's internal to this file. However, // the inverse is not necessarily true. Someone can ask to listen without //listening for any specific fields. This could be useful in debugging. System.Diagnostics.Debug.Assert(_refereneceCounts.Count == 0); _owner.Remove(this); } else { // TODO did we add or remove fields? If we added at least one field we should // immediately request more data. If we only removed fields, we should wait // until the next item was due, just like if we automatically reconnect. TopListRequest request = new TopListRequest(); request.ExtraColumnFormulas = new Dictionary(); foreach (var kvp in _refereneceCounts) request.ExtraColumnFormulas[MakeRowDataKey(kvp.Key)] = kvp.Key; // TODO if we are removing fields, not adding any fields, set the LastUpdate. // request.LastUpdate = // For simplicity I'm always looking at the current value. Maybe there is // a need for data that freezes at the close. If so, we'd need to seperate // SymbolRequest objects based on this, not just the symbol. request.OutsideMarketHours = true; request.SingleSymbol = _symbol; request.Streaming = true; // There's no good way to know for sure if we need it, so I'll always ask // for the database. This is a one time cost. (Once per subscription.) // This choice was mostly aimed at the one-time requests for the exchange or // company name associated with a symbol. The server gets a lot of those at // once, and skipping the database could make a bigger difference. request.UseDatabase = false; _topListToken = request.Send(_owner._sendManager, this); } } public void Add(TokenImpl userRequest) { AssertInThread(); bool newlyAdded = _allUserRequests.Add(userRequest); System.Diagnostics.Debug.Assert(newlyAdded); bool somethingChanged = false; int referenceCount; foreach (string formula in userRequest.FormulaSet) { _refereneceCounts.TryGetValue(formula, out referenceCount); if (referenceCount == 0) somethingChanged = true; referenceCount++; _refereneceCounts[formula] = referenceCount; } if (somethingChanged) _needsAttention.Add(this); } private HashSet _allUserRequests = new HashSet(); private readonly string _symbol; private readonly StockDataManager _owner; private readonly Dictionary _refereneceCounts = new Dictionary(); // Formula -> reference count. private TopListRequest.Token _topListToken; public string GetSymbol() { return _symbol; } public SymbolRequest(String symbol, StockDataManager owner) { _symbol = symbol; _owner = owner; } public void Cancel(TokenImpl userRequest) { AssertInThread(); if (!_allUserRequests.Remove(userRequest)) // Not found. This is explicitly allowed. Possibly a duplicate call. return; bool somethingChanged = false; foreach (String formula in userRequest.FormulaSet) { int referenceCount = _refereneceCounts[formula]; referenceCount--; System.Diagnostics.Debug.Assert(referenceCount >= 0); if (referenceCount == 0) { _refereneceCounts.Remove(formula); somethingChanged = true; } else _refereneceCounts[formula] = referenceCount; } if (somethingChanged) _needsAttention.Add(this); } void TopListRequest.Listener.OnRowData(List rows, DateTime? start, DateTime? end, TopListRequest.Token token) { DoInThread(delegate { if (rows.Count == 0) // No matching data. Probably this was an invalid symbol. For now, ignore it. // Maybe we should send this info the the caller somehow? return; else if (rows.Count > 1) { // Some type of serious confusion. Probably a bug here or in the server. No // way to know which row is correct, so just ignore it. System.Diagnostics.Debug.WriteLine("Too many rows returned in StockDataManager.cs."); return; } SendRowData(rows[0]); }); } /// /// This is public only for debug purposes. The sends the given row data to any listeners. /// /// public void SendRowData(RowData rowData) { AssertInThread(); foreach (TokenImpl userRequest in _allUserRequests) try { Response response = new Response(); response.RowData = rowData; response.Token = userRequest; userRequest.Callback(response); } catch (Exception ex) { // An exception in someone else's code. That person is no longer sitting // on the stack above us. Nothing to do but report it and move on. System.Diagnostics.Debug.WriteLine("Exception in external callback in StockDataManager.cs: " + ex); } } void TopListRequest.Listener.OnMetaData(TopListRequest.Token token) { // The server should only send this if ((we have a collaborate string) AND // (we ask for this)). We could just ignore this. But if it happens // I'd like to know. System.Diagnostics.Debug.WriteLine("Unexpected metadata in StockDtaManager."); } void TopListRequest.Listener.OnDisconnect(TopListRequest.Token token) { DoInThread(delegate { // This is the same routine as when someone adds or deletes a user request. // This will shut down and throw out the old server request (if it exists) // and build and send a new one (if required). _needsAttention.Add(this); }); } }; private readonly Dictionary _allRequests = new Dictionary(); private static readonly Queue _eventQueue = new Queue(); private static readonly ManualResetEventSlim _eventsAvailable = new ManualResetEventSlim(false); private static void MainLoop() { _correctThread = true; while (true) { while (true) { Action nextEvent; lock (_eventQueue) { if (_eventQueue.Count == 0) break; else nextEvent = _eventQueue.Dequeue(); } try { nextEvent(); } catch (Exception ex) { // Presumably a bug in this file. System.Diagnostics.Debug.WriteLine("Unexpected exception in internal StockDataManager callback: " + ex); } } if (SymbolRequest.OnIdle()) { // We did some work from our list of low priority jobs. Go back to the top of the // loop and see if there's any high priority work. Eventually get back here to check // for more low priority work. } else { // No more work found. Go to sleep until someone tells us we have more work. _eventsAvailable.Wait(); _eventsAvailable.Reset(); } } } static StockDataManager() { Thread t = new Thread(MainLoop); t.IsBackground = true; t.Start(); } private void Remove(SymbolRequest symbolRequest) { AssertInThread(); _allRequests.Remove(symbolRequest.GetSymbol()); } } }