using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using TradeIdeas.MiscSupport;
using System.Threading;
using System.Web;
using System.Security.Cryptography;
using TradeIdeas.TIProData.Interfaces;
namespace TradeIdeas.TIProData
{
///
/// Use this to request streaming data about one specific symbol. Request any fields
/// you want, including standard fields or custom formulas.
///
/// Despite the size of this file, the interface is very small.
/// o StockDataManager.Find() -- Finds the appropriate object to start the process.
/// o StockDataManager.Subscribe() -- Request streaming data. Returns a Token.
/// o StockDataManager.Token.Cancel() -- Stop the request and clean up.
/// StockDataManager.Token contains other data, too, including a look at the
/// original request.
/// o StockDataManager.MakeRowDataKey() -- Required to look up the result in the
/// callback.
///
/// This is responsible for merging similar requests. If two people both ask for the
/// price of MSFT, we only send one request to the server but we share the results
/// with both listeners. If one person asks for the price and volume of MSFT, and
/// someone else asks for the price and 5 minute range of MSFT, they will both share
/// a single request which is the union of both requests.
///
/// When you cancel a request, that might cancel the request to the server. Or it
/// might replace the current request with a simpler one. When you ask for new data,
/// or cancel data, we might try to look for other requests and group them all together.
///
/// If you want data that doesn't change often, like a the company name or sector for
/// a stock symbol, look at SymbolDetailsCacheManager.cs. If you want data for a lot
/// of stocks look at TopListRequest. If you want similar data but for a historical
/// time, look at TopListManager.
///
/// This was inspired by QuoteFeedManager. This will hopefully replace QuoteFeedManager.
///
/// From the outside this looks like most classes in TIPro data. You can make a new
/// request or cancel an old request from any thread. You will get callbacks and we
/// make no promises about what thread you will be in.
///
/// On the inside this looks like our C++ server code. And it looks like a lot of C#
/// GUI code. We automatically create one thread just for us. All external operations
/// (e.g. a user requests data from us, or the server gives data to us) are send to
/// this thread. Only this thread can access most variables and methods. The only
/// lock, and the only data shared with other threads, is a queue that sends messages
/// to this thread.
///
public class StockDataManager
{
///
/// Given a custom formula, come up with an apporpriate name. Use this name when
/// looking for data in the RowData object that we provide with each callback.
///
/// If two people both request the same custom formula, even without knowing it,
/// their requests should be merged.
///
/// Some names are somewhat standard and used in a lot of places in the code.
/// Library routines, like when you right click on a row and a "send to" menu
/// appears, expect to see certain fields with standard names.
///
/// This is not as fast as I'd like. If convenient you should store a copy of
/// this in a variable.
///
///
/// The formula that you want the server to compute. The same value that you
/// sent to Subscribe().
///
///
public static string MakeRowDataKey(string formula)
{
if ((formula.Length > 1) && (formula[0] == '[') && (formula[formula.Length - 1] == ']'))
{
string middle = formula.Substring(1, formula.Length - 2);
bool valid = true;
foreach (char ch in middle)
if (!((ch == '_') || Char.IsLetterOrDigit(ch)))
{
valid = false;
break;
}
if (valid)
// This looks like a simple filter and nothing else. This type
// of request can come from a lot of places and it usually has
// the same key.
return CommonAlertFields.GetFilterWireName(middle);
}
// The size is fixed. The result is unreadable. It is possible, but highly
// unlikely, that two inputs will return the same key. The alternatives
// were less appealing. We used to use a simple encoding where keys can
// get very long. You could try to make a short key like "K1", "K2", etc.
// To make that work you'd need to keep a complete list of translations,
// which could look like a memory leak. To avoid the memory leak you could
// make each piece of code that uses a key increment a reference counter, and
// decrement it when it was done. That would a serious pain, if it was
// possible at all.
return MakeHashedRowDataKey(formula);
}
private static string MakeHashedRowDataKey(string formula)
{
byte[] resultAsBytes;
using (SHA1 hasher = SHA1.Create())
resultAsBytes = hasher.ComputeHash(Encoding.UTF8.GetBytes(formula));
string result = Convert.ToBase64String(resultAsBytes);
result = result.Replace('+', '_').Replace('/', '.').Replace("=", "");
return "F_" + result;
}
[ThreadStatic]
static bool _correctThread;
private static void DoInThread(Action action)
{
lock (_eventQueue)
{
_eventQueue.Enqueue(action);
}
_eventsAvailable.Set();
}
private static void AssertInThread()
{
System.Diagnostics.Debug.Assert(_correctThread);
}
///
/// You get a new one of these each time you call Subscribe(). You can use == or != on it
/// when you get a response to make sure the response came from the correct call to
/// Subscribe().
///
/// This was inspired by TopListRequest.Token. Among other things, this groups together
/// a lot of data all related to the request that created it.
///
public interface Token
{
///
/// Cancel the given request. It is safe to call this even after the request
/// has already been canceled.
///
void Cancel();
///
/// This can be used to distinguish between two requests made by the same
/// listener. Normally you can use the Token object for that. However, in a multi-
/// threaded environment this could still help. You don't know the identity of
/// the Token object until Send() returns. But the result of a send might appear
/// in another thread before Send() returns.
///
/// This can be used for anything. It might be null. This will be whatever value
/// you gave to Send().
///
object ClientInfo { get; }
///
/// From the original request.
///
String Symbol { get; }
///
/// From the original request.
/// The details of this request have not been defined.
/// Internally we use a set of strings. Ideally you'd get a read-only
/// version of this set. C# doesn't make a simple read only wrapper for
/// sets. It would be dangerous to share the non-read-only version, but
/// it would be slow to make copies that might never be used. For now
/// I'm just copying an object that we already have, the array of strings.
///
String[] Formulas { get; }
}
public class Response
{
public RowData RowData;
public Token Token;
// Time? Probably not that interesting. For a single symbol request
// the server pushes things out as soon as it gets new data. So the time
// associated with the response is probably the current time plus the
// network latency. Maybe there's delayed data for someone who didn't
// pay. Maybe (if we add that option) we're looking at data frozen at
// the close. Maybe we're getting an update in the middle of the night
// because we asked for it, and the last data is from long ago.
}
///
/// Pretend like we got this record from the server. Send it to any and all listeners.
/// This will be sent to another thread, exactly like data from the server.
///
/// This message will go to anyone subscribed to this symbol.
/// The field names and values, url encoded.
public void DebugSend(string symbol, string encodedValues)
{
RowData rowData = new RowData();
var parsed = HttpUtility.ParseQueryString(encodedValues);
foreach (string key in parsed)
rowData.Data[key] = parsed[key];
DebugSend(symbol, rowData);
}
///
/// Pretend like we got this record from the server. Send it to any and all listeners.
/// This will be sent to another thread, exactly like data from the server.
///
/// This message will go to anyone subscribed to this symbol.
/// This will be sent to all listeners.
public void DebugSend(string symbol, RowData rowData)
{
if (!rowData.Data.ContainsKey(CommonAlertFields.SYMBOL))
rowData.Data[CommonAlertFields.SYMBOL] = symbol;
DoInThread(delegate { DebugSendInThread(symbol, rowData); });
}
private void DebugSendInThread(string symbol, RowData rowData)
{
SymbolRequest symbolRequest;
_allRequests.TryGetValue(symbol, out symbolRequest);
if (null != symbolRequest)
symbolRequest.SendRowData(rowData);
}
private class TokenImpl : Token
{
public override string ToString()
{
StringBuilder sb = new StringBuilder();
sb.Append("StockDataManager.TokenImpl(").Append(Symbol);
foreach (string formula in _formulas)
sb.Append(", ").Append(formula);
sb.Append(")");
return sb.ToString();
}
public void Cancel()
{
DoInThread(delegate
{
SymbolRequest request;
if (Owner._allRequests.TryGetValue(Symbol, out request))
{ // If we didn't get this far, the owner was already released.
// That would mean that all listeners were already canceled.
// That would mean that this cancel request is a duplicate and
// we should silently ignore it.
request.Cancel(this);
}
});
}
private readonly object _clientInfo;
public object ClientInfo
{
get { return _clientInfo; }
}
private readonly String _symbol;
public string Symbol
{
get { return _symbol; }
}
private readonly string[] _formulas;
public string[] Formulas
{
get { return _formulas; }
}
public readonly HashSet FormulaSet;
public readonly Action Callback;
public readonly StockDataManager Owner;
public TokenImpl(Action callback, object clientInfo, String symbol,
string[] formulas, HashSet formulaSet, StockDataManager owner)
{
Callback = callback;
_clientInfo = clientInfo;
_symbol = symbol;
_formulas = formulas;
FormulaSet = formulaSet;
Owner = owner;
}
}
public Token Subscribe(Action callback, object clientInfo, string symbol, params string[] formulas)
{
HashSet formulaSet = new HashSet(formulas);
TokenImpl result = new TokenImpl(callback, clientInfo, symbol, formulas, formulaSet, this);
DoInThread(delegate
{
SymbolRequest request;
_allRequests.TryGetValue(symbol, out request);
if (null == request)
{
request = new SymbolRequest(symbol, this);
_allRequests[symbol] = request;
}
request.Add(result);
});
return result;
}
private readonly ISendManager _sendManager;
private StockDataManager(ISendManager sendManager)
{
_sendManager = sendManager;
}
static private Dictionary _instances =
new Dictionary();
static public StockDataManager Find(ISendManager sendManager)
{
lock (_instances)
{
StockDataManager result;
if (_instances.TryGetValue(sendManager, out result))
return result;
result = new StockDataManager(sendManager);
_instances.Add(sendManager, result);
return result;
}
}
static private StockDataManager _main;
static public StockDataManager Find()
{
if (null == _main)
_main = Find(ConnectionMaster.First.SendManager);
return _main;
}
///
/// We merge all user requests for the same symbol. We should never have more than one of these per symbol.
/// The user doesn't directly see this.
///
private class SymbolRequest : TopListRequest.Listener
{
///
/// Start with a FIFO. The first item you Add() will be the first item you Get().
///
/// This queue can only have one copy of each item at a time. If you try to add
/// an item which is already in the list, that operation is silently ignored. Note
/// that the item keeps its place in the list and is not moved to the back of the
/// list. We absolutely keep the original and ignore the new value.
///
/// You get this and a lot more from a Java LinkedHashSet. StackOverflow has
/// a full implemention of that data structure, but it's huge and barely tested.
/// http://stackoverflow.com/questions/9346526/what-is-the-equivalent-of-linkedhashset-java-in-c
///
private class UniqueFifo where T : class
{
private readonly Queue _fifo = new Queue();
private readonly HashSet _unique = new HashSet();
public void Add(T item)
{
if (_unique.Add(item))
_fifo.Enqueue(item);
}
///
/// Find the oldest item in the queue. Remove it and return it.
///
/// Once you Get() and item it's safe to add the item again.
///
/// The oldest item in the queue, or null if the queue is empty.
public T Get()
{
if (_unique.Count == 0)
return null;
T result = _fifo.Dequeue();
_unique.Remove(result);
return result;
}
}
// Sometimes we defer an operation. If someone makes a change, we wait and see if a related change is
// coming before doing anything. This is an implementation detail, like caching.
private static readonly UniqueFifo _needsAttention = new UniqueFifo();
// This will execute one item and return true. Or if there's no work, it returns false.
// This is meant to be low priority. We should be able to stop in the middle of our idle
// tasks if new high priority work appears.
public static bool OnIdle()
{
SymbolRequest request = _needsAttention.Get();
if (null == request)
return false;
request.UpdateYourself();
return true;
}
private void UpdateYourself()
{
AssertInThread();
// Assume for simplicity that there was a real change. It's certainly possible
// that someone added a new field and then deleted it before we got a chance to
// do anything with it. So the new request will be identical to the current
// request. We could check for that, but it seems rare and unlikely.
if (null != _topListToken)
_topListToken.Cancel();
_topListToken = null;
if (_allUserRequests.Count == 0)
{ // The main program isn't listening any more. No one for us to forward
// anything to. No useful data for us to save.
//
// Note: No listeners should imply no data. If the last listener went
// away and we think we still need data for the Price field, that's a
// serious error. And it's an error that's internal to this file. However,
// the inverse is not necessarily true. Someone can ask to listen without
//listening for any specific fields. This could be useful in debugging.
System.Diagnostics.Debug.Assert(_refereneceCounts.Count == 0);
_owner.Remove(this);
}
else
{ // TODO did we add or remove fields? If we added at least one field we should
// immediately request more data. If we only removed fields, we should wait
// until the next item was due, just like if we automatically reconnect.
TopListRequest request = new TopListRequest();
request.ExtraColumnFormulas = new Dictionary();
foreach (var kvp in _refereneceCounts)
request.ExtraColumnFormulas[MakeRowDataKey(kvp.Key)] = kvp.Key;
// TODO if we are removing fields, not adding any fields, set the LastUpdate.
// request.LastUpdate =
// For simplicity I'm always looking at the current value. Maybe there is
// a need for data that freezes at the close. If so, we'd need to seperate
// SymbolRequest objects based on this, not just the symbol.
request.OutsideMarketHours = true;
request.SingleSymbol = _symbol;
request.Streaming = true;
// There's no good way to know for sure if we need it, so I'll always ask
// for the database. This is a one time cost. (Once per subscription.)
// This choice was mostly aimed at the one-time requests for the exchange or
// company name associated with a symbol. The server gets a lot of those at
// once, and skipping the database could make a bigger difference.
request.UseDatabase = false;
_topListToken = request.Send(_owner._sendManager, this);
}
}
public void Add(TokenImpl userRequest)
{
AssertInThread();
bool newlyAdded = _allUserRequests.Add(userRequest);
System.Diagnostics.Debug.Assert(newlyAdded);
bool somethingChanged = false;
int referenceCount;
foreach (string formula in userRequest.FormulaSet)
{
_refereneceCounts.TryGetValue(formula, out referenceCount);
if (referenceCount == 0)
somethingChanged = true;
referenceCount++;
_refereneceCounts[formula] = referenceCount;
}
if (somethingChanged)
_needsAttention.Add(this);
}
private HashSet _allUserRequests = new HashSet();
private readonly string _symbol;
private readonly StockDataManager _owner;
private readonly Dictionary _refereneceCounts = new Dictionary(); // Formula -> reference count.
private TopListRequest.Token _topListToken;
public string GetSymbol()
{
return _symbol;
}
public SymbolRequest(String symbol, StockDataManager owner)
{
_symbol = symbol;
_owner = owner;
}
public void Cancel(TokenImpl userRequest)
{
AssertInThread();
if (!_allUserRequests.Remove(userRequest))
// Not found. This is explicitly allowed. Possibly a duplicate call.
return;
bool somethingChanged = false;
foreach (String formula in userRequest.FormulaSet)
{
int referenceCount = _refereneceCounts[formula];
referenceCount--;
System.Diagnostics.Debug.Assert(referenceCount >= 0);
if (referenceCount == 0)
{
_refereneceCounts.Remove(formula);
somethingChanged = true;
}
else
_refereneceCounts[formula] = referenceCount;
}
if (somethingChanged)
_needsAttention.Add(this);
}
void TopListRequest.Listener.OnRowData(List rows, DateTime? start, DateTime? end, TopListRequest.Token token)
{
DoInThread(delegate
{
if (rows.Count == 0)
// No matching data. Probably this was an invalid symbol. For now, ignore it.
// Maybe we should send this info the the caller somehow?
return;
else if (rows.Count > 1)
{ // Some type of serious confusion. Probably a bug here or in the server. No
// way to know which row is correct, so just ignore it.
System.Diagnostics.Debug.WriteLine("Too many rows returned in StockDataManager.cs.");
return;
}
SendRowData(rows[0]);
});
}
///
/// This is public only for debug purposes. The sends the given row data to any listeners.
///
///
public void SendRowData(RowData rowData)
{
AssertInThread();
foreach (TokenImpl userRequest in _allUserRequests)
try
{
Response response = new Response();
response.RowData = rowData;
response.Token = userRequest;
userRequest.Callback(response);
}
catch (Exception ex)
{ // An exception in someone else's code. That person is no longer sitting
// on the stack above us. Nothing to do but report it and move on.
System.Diagnostics.Debug.WriteLine("Exception in external callback in StockDataManager.cs: " + ex);
}
}
void TopListRequest.Listener.OnMetaData(TopListRequest.Token token)
{ // The server should only send this if ((we have a collaborate string) AND
// (we ask for this)). We could just ignore this. But if it happens
// I'd like to know.
System.Diagnostics.Debug.WriteLine("Unexpected metadata in StockDtaManager.");
}
void TopListRequest.Listener.OnDisconnect(TopListRequest.Token token)
{
DoInThread(delegate
{ // This is the same routine as when someone adds or deletes a user request.
// This will shut down and throw out the old server request (if it exists)
// and build and send a new one (if required).
_needsAttention.Add(this);
});
}
};
private readonly Dictionary _allRequests = new Dictionary();
private static readonly Queue _eventQueue = new Queue();
private static readonly ManualResetEventSlim _eventsAvailable = new ManualResetEventSlim(false);
private static void MainLoop()
{
_correctThread = true;
while (true)
{
while (true)
{
Action nextEvent;
lock (_eventQueue)
{
if (_eventQueue.Count == 0)
break;
else
nextEvent = _eventQueue.Dequeue();
}
try
{
nextEvent();
}
catch (Exception ex)
{ // Presumably a bug in this file.
System.Diagnostics.Debug.WriteLine("Unexpected exception in internal StockDataManager callback: " + ex);
}
}
if (SymbolRequest.OnIdle())
{ // We did some work from our list of low priority jobs. Go back to the top of the
// loop and see if there's any high priority work. Eventually get back here to check
// for more low priority work.
}
else
{ // No more work found. Go to sleep until someone tells us we have more work.
_eventsAvailable.Wait();
_eventsAvailable.Reset();
}
}
}
static StockDataManager()
{
Thread t = new Thread(MainLoop);
t.IsBackground = true;
t.Start();
}
private void Remove(SymbolRequest symbolRequest)
{
AssertInThread();
_allRequests.Remove(symbolRequest.GetSymbol());
}
}
}