using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using TradeIdeas.ServerConnection; namespace TradeIdeas.MarketDataProxy { public class SubscriptionItem { private string _command; private string _symbol; public SubscriptionItem(string command, string symbol) { _command = command; _symbol = symbol; } public string Command { get { return _command; } } public string Symbol { get { return _symbol; } } // This well known command requests TOS and L1 data. public static readonly string L1_COMMAND = "Add-L1"; // This is a very specialized item which only requests NYSE bid and ask data for stocks listed on the NYSE. public static readonly string REGIONAL_COMMAND = "Add-Regional"; } public class ConnectionDescription { // In the Delphi version you could change these at any time. // Because we allow more things with threads, you can only set // these in the constructor. But it would be inconvient for // the constructor to have this many arguments. This keeps // the convenience of setting the parameters by name, in any // order, and using defaults for the ones you don't care about. public string username; public string password; public string hostname; public int port; public delegate void Status(ConnectionListener source, string status); public Status onStatusChanged; public delegate void Connection(ConnectionListener source); public Connection onConnected; public Connection onDisconnected; public delegate void Subscription(ConnectionListener source, List data); public Subscription onSubscription; public delegate void Preview(ConnectionListener source, byte[] body); public Preview onPreview; public ConnectionDescription freeze() { // The whole point of this class is to pass in parameters // that don't change. We do a memberwise clone to make // sure that no one tries to cheat by changing this object // after giving it to us. return (ConnectionDescription)MemberwiseClone(); } } public class ConnectionListener : TalkWithServer.IListener { // ConnectionListener is responsible for keeping a connection to // the server open. TalkWithSever is the next level down. This // class will automatically create a new instance of // TalkWithServer if we go down unexpectedly. This class is // responsible for pinging the server, which is one of the ways // that we know the connection went down. This class is // responsible for logging in, which may be required each time // we connect. (Some servers ignore the login message.) And // this class listens for a special response from the server // telling us not to try to log in again automatically. // // Most programs which use TalkWithServer do not let random parts // of the program talk directly to TalkWithServer. There is // usually a class like this responsible for the lifecycle of // TalkWithServer. The details, however, vary from one program // to the next. This class is usually cloned and modified, where // TalkWithServer is referenced and not modified. // // The Dephi version included a hard reset. We don't support // that directly. If you want to start fresh, disconnect this // object and create a new one. The Delphi version also made // more use of the soft reset. You could change certain // parameters in this object. The server required a restart // at that time, but this class hid that by doing a soft reset. // We don't bother with that here, but it might make some sense // if and when we reproduce the ActiveX API in C#. private ConnectionDescription _connectionDescription; private volatile bool _connectionHalted; /* _sessionId is used by the server to ensure that only one * client is active at a time. This is a majic cookie to us, * and not much more than a random number generator to the * server. */ private string _sessionId; private int _inactiveCount; private static readonly String[] DELIMITER = { "\r\n" }; private void AccountStatusResponse(byte[] body, object unused) { if (body == null) return; String wholeMessage = TalkWithServer.BytesToStr(body); String[] lines = wholeMessage.Split(DELIMITER, 0); if (lines.Length < 1) return; if (lines[0] == "STOP") { _connectionHalted = true; if (lines.Length > 1) { // TODO SetStatus // SetStatus(lines[1]) } } else if ((lines.Length > 1) && (lines[0] == "SESSION ID")) { _sessionId = lines[1]; } } private static string[] MAKE_LINES = { "\r\n" }; private static char[] PARSE_COMMAND = { ':' }; private void SubscriptionResponse(byte[] body, object unused) { if (_connectionDescription.onSubscription == null) // This is the unlikely case where someone doesn't care about subscriptions. return; if (body == null) // Error. Presumably someone who cared would have seen a disconnect message, so we don't need // to report this. return; List result = new List(); foreach (string line in Encoding.ASCII.GetString(body).Split(MAKE_LINES, StringSplitOptions.RemoveEmptyEntries)) { string[] pieces = line.Split(PARSE_COMMAND, 2); if ((pieces.Length != 2) || (pieces[0] == "") || (pieces[1] == "")) // Silently skip anything that doesn't look right. That's mostly aimed at blank lines. continue; result.Add(new SubscriptionItem(pieces[0], pieces[1])); } _connectionDescription.onSubscription(this, result); } private TalkWithServer _serverConnection; private TalkWithServer GetServerConnection() { return GetServerConnection(true); } private TalkWithServer GetServerConnection(bool createIfRequired) { lock(this) { if ((_serverConnection == null) && createIfRequired) { // Create Server Connection ResetPingTime(); TcpIpConnectionFactory connectionFactory = new TcpIpConnectionFactory(_connectionDescription.hostname, _connectionDescription.port); _serverConnection = new TalkWithServer(connectionFactory, this); _serverConnection.Connect(); if (!_connectionHalted) { // Log in _serverConnection.SendMessage (TalkWithServer.CreateMessage("command", "login", "username", _connectionDescription.username, "password", _connectionDescription.password, "session_id", _sessionId), new TalkWithServer.Response(AccountStatusResponse), true); _serverConnection.SendMessage (TalkWithServer.CreateMessage("command", "subscription"), new TalkWithServer.Response(SubscriptionResponse), true); } } return _serverConnection; } } void TalkWithServer.IListener.OnDisconnected(String errorMessage) { if (_connectionDescription.onDisconnected != null) try { // SetStatus(errorMessage); TODO _connectionDescription.onDisconnected(this); } catch { } } void TalkWithServer.IListener.OnAutoRetry(String errorMessage) { } void TalkWithServer.IListener.OnMessagePreview(byte[] body) // body is null on error. { if (_connectionDescription.onPreview != null) _connectionDescription.onPreview(this, body); } /* Ping is required. The remote side will break the connection if it * does not hear from us on a regular basis. This program sends data * when it is available, and might not send any messages if there is * no interesting data. * * Some programs go further and report a ping time. That does not * seem very helpful here. This was made to talk to another computer * in the same server room. */ private void PingServer() { GetServerConnection().SendMessage (TalkWithServer.CreateMessage("command", "ping", "response", "1"), new TalkWithServer.Response(PingResponse)); } private void ResetPingTime() { // This is a placeholder. Some versions of this file will // keep track of how long a ping takes. } private void PingResponse(byte[] body, object unused) { if (body != null) { lock (this) { _inactiveCount = 0; } //SetStatus('Working: ' + TimeToStr(Now)); } } // Export these so we don't have to export our instance of TTalkWithServer. public void SendMessage(Dictionary message, TalkWithServer.Response response, bool streaming) { GetServerConnection().SendMessage(message, response, streaming); } public void SendMessage(Dictionary message, TalkWithServer.Response response) { SendMessage(message, response, false); } public void SendMessage(Dictionary message) { SendMessage(message, null, false); } private void SoftReset() { lock (this) { if (_serverConnection != null) { _serverConnection.Disconnect(); _serverConnection = null; _inactiveCount = 0; } } } private void DoTimerThread() { while (!_connectionHalted) { lock (this) { if ((_serverConnection != null) && (_serverConnection.GetStatus() == TalkWithServer.Status.scsDisconnected)) SoftReset(); _inactiveCount++; if (_inactiveCount < 21) PingServer(); else SoftReset(); } Thread.Sleep(500); } lock (this) { if (_serverConnection != null) _serverConnection.Disconnect(); } } public ConnectionListener(ConnectionDescription connectionDescription) { _connectionDescription = connectionDescription.freeze(); } public void Connect() { // Usual reason: This could be part of the constructor, but then // you might get callbacks before saving this object in a variable. // Do not try to send any messages before connecting. Thread thread = new Thread(new ThreadStart(DoTimerThread)); thread.IsBackground = true; thread.Start(); } public void Disconnect() { // Throw away the resources. Can't use this object again after a disconnect. // It is safe to call this more than once. _connectionHalted = true; //SoftReset(); // Calling SoftReset from the GUI lead to a deadlock. The thread will // disconnect the socket soon enough, and that's all we really need. } ~ConnectionListener() { Disconnect(); } } }