using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using TradeIdeas.MiscSupport; using TradeIdeas.TIProData; using System.Windows.Forms; using TradeIdeas.TIProData.Interfaces; /* A wrapper around TopListRequest. An attempt to reuse requests for streaming data. * * Note that this is in TIProGUI. So we use the standard GUI rules. You should only * call this class (including the class constructor!) from the GUI thread. You will * receive responses from the GUI thread. If you violate the rules, almost anything * is possible. * */ namespace TradeIdeas.TIProGUI { public delegate void QuoteFeedDataReceivedHandler(string symbol, RowData data); public class QuoteFeedManager : TopListRequest.Listener { /* * These are some ideas on how we might reorganize the QuoteFeedManager. This code isn't * finished yet and doesn't compile yet. * * . This will notify a user when his specific request was satisfied. Previously we had * one callback for all events. * . This will keep track of how many people are watching a symbol, and will release the * request when the last listener goes away. * . This will merge various requests for the same symbol. Presumably the different * requests might not have exactly the same fields, but there is enough overlap that * we still want to send just one request per symbol at a time. * . This does not have any hard coded fields. Unless someone asks for a field, we'll * never get it. And you can ask for any field. * // Subcribe(me, "Price", "Vol10"); public void Subscribe(QuoteFeedDataReceivedHandler callback, string symbol, params string[] columnInternalCodes) { SymbolRequest request = _allSymbolRequests[symbol]; bool newRequest = null == request; if (newRequest) { request = new SymbolRequest(symbol); _allSymbolRequests[symbol] = request; } request.Add(listener, columnInternalCodes); // Maybe keep track of the callbacks, too, so not everyone gets all data. } private class Listener { public QuoteFeedDataReceivedHandler callback; public string[] columnInternalCodes; } private HashSet _needsAttention = new HashSet(); private class SymbolRequest { private Dictionary _allListeners = new Dictionary(); string symbol; Dictionary _refereneceCounts = new Dictionary(); // Column internal code -> reference count. public SymbolRequest(string symbol) { // TODO: Complete member initialization this.symbol = symbol; } public void Add(QuoteFeedDataReceivedHandler callback, string[] columnInternalCodes) { // TODO check if this listener is already registered. Listener listener = new Listener(); listener.callback = callback; listener.columnInternalCodes = columnInternalCodes; _allListeners[callback] = listener; bool somethingChanged = false; int referenceCount; foreach (string internalCode in columnInternalCodes) { _refereneceCounts.TryGetValue(internalCode, out referenceCount); if (referenceCount == 0) somethingChanged = true; referenceCount++; _refereneceCounts[internalCode] = referenceCount; } if (somethingChanged) { _needsAttention.Add(this); } } public void Cancel(QuoteFeedDataReceivedHandler callback) { Listener listener; _allListeners.TryGetValue(callback, out listener); if (null == listener) // This is explicitly allowed; return; foreach (String internalCode in listener.columnInternalCodes) { int referenceCount = _refereneceCounts[internalCode]; referenceCount--; // assert referenceCount >= 0 if (referenceCount == 0) _refereneceCounts.Remove(internalCode); else _refereneceCounts[internalCode] = referenceCount; } if (somethingChanged) _needsAttention.Add(this); } } private void IdleHandler() { var needsAttention = _needsAttention; _needsAttention = new HashSet(); foreach (SymbolRequest request in needsAttention) { request.UpdateYourself(); if (request.IsEmpty()) { _allSymbolRequests.Remove(request.symbol); } } } private Dictionary _allSymbolRequests = new Dictionary(); // Symbol -> request. */ public event QuoteFeedDataReceivedHandler QuoteFeedDataReceived; /// /// Stores all current SymbolDetails objects. Should be one per symbol. /// Use ConcurrentDictionary to fix threading issues: https://stackoverflow.com/questions/39923397/dictionary-lookup-throws-index-was-outside-the-bounds-of-the-array /// private ConcurrentDictionary _listeners = new ConcurrentDictionary(); /// /// Most recently received RowData objects for each symbol. /// Use ConcurrentDictionary to fix threading issues: https://stackoverflow.com/questions/39923397/dictionary-lookup-throws-index-was-outside-the-bounds-of-the-array /// private ConcurrentDictionary _currentValueStore = new ConcurrentDictionary(); /// /// Used as a single manager for quote feeds (top lists for a single symbol). Can be shared across TIPro. /// To start receiving data, look for the QuoteFeedDataReceived event handler. /// public QuoteFeedManager() { } private static readonly List ColumnInternalCodes = new List { // TODO this list was copied as is from SymbolDetails.cs. It is way too long. We ignore most of these. "D_Name", "D_Exch", "D_Symbol", "UpJan1P", "R60M", "U20DP", "RD", "MCap", "R5D", "ShortG", "RY", "SFloat", "Cash", "EPS", "Debt", "EarningD", "Revenue", "STH", "QRevG", "STP", "Price", "TV", "FCD", "FCP", "RV", "VWV" }; private static readonly Dictionary Columns; static QuoteFeedManager() { TopListRequest sample = new TopListRequest(); foreach (string internalCode in ColumnInternalCodes) { sample.AddExtraColumn(internalCode); } Columns = sample.ExtraColumnFormulas; // TODO what about four_digits? Will the price alerts (and others) use that if we have it? } private static readonly ISendManager _sendManager = GuiEnvironment.FindConnectionMaster("").SendManager; /// /// Adds quote feed via SymbolDetails if a quote feed doesn't already exist for symbol. /// /// Symbol to add quote feed for. public void AddQuoteFeed(string symbol) { //if (!Application.MessageLoop) // System.Diagnostics.Debug.WriteLine("A good place for a breakpoint."); //System.Diagnostics.Debug.Assert(Application.MessageLoop, "Must call this in the GUI thread."); if (_listeners.ContainsKey(symbol)) // Already watching this symbol. return; TopListRequest request = new TopListRequest(); request.ExtraColumnFormulas = Columns; request.OutsideMarketHours = true; request.SingleSymbol = symbol; request.Streaming = true; request.UseDatabase = false; _listeners.TryAdd(symbol, request.Send(_sendManager, this)); } /// /// Returns the current rowdata object that's most recently been received by the QuoteFeedManager. /// /// /// public RowData GetCurrentValues(string symbol) { if (_currentValueStore.ContainsKey(symbol)) return _currentValueStore[symbol]; return null; } void TopListRequest.Listener.OnRowData(List rows, DateTime? start, DateTime? end, TopListRequest.Token token) { if (rows.Count != 1) // Tempting to report something to the log. return; RowData rowData = rows[0]; TopListRequest request = token.GetRequest(); request.LastUpdate = DateTime.Now; string symbol = request.SingleSymbol; if (_listeners.ContainsKey(symbol) && token == _listeners[symbol]) { GuiEnvironment.BeginInvokeIfRequired(delegate { _currentValueStore[symbol] = rowData; if (null != QuoteFeedDataReceived) QuoteFeedDataReceived(symbol, rowData); }); } } void TopListRequest.Listener.OnMetaData(TopListRequest.Token token) { // Ignore. We do not provide a collaborate string so we should not receive meta data. } void TopListRequest.Listener.OnDisconnect(TopListRequest.Token token) { // Currently we never stop one of these. So if we get OnDisconnect(), it's a network // issue, and we should retry. GuiEnvironment.BeginInvokeIfRequired(delegate { string symbol = token.GetRequest().SingleSymbol; if (_listeners.ContainsKey(symbol) && token == _listeners[symbol]) { _listeners.TryRemove(symbol, out token); // TODO we should reuse the last request. In particular we want to save the LastUpdate field. AddQuoteFeed(symbol); } }); } } }