using System; using System.Collections.Generic; using System.Text; using System.Net; using System.Net.Sockets; using System.Threading; using System.Diagnostics; using Org.Mentalis.Network.ProxySocket; namespace TradeIdeas.ServerConnection { public interface ConnectionCallbacks { // For performance reasons the array may be bigger than needed. The data goes // from read[0] through read[count-1]. void OnConnected(IConnection connection); void OnRead(byte[] read, int count, IConnection connection); void OnClosed(IConnection connection); void OnAutoRetry(IConnection source, String msg); void OnError(Exception ex); } public interface IConnection { void Disconnect(); void Connect(); void Send(List messages); void Send(byte[] message); } /// /// This is a simple implementation of the Connection class. The connection class is a wrapper around a socket. /// All I/O is asynchronous. The connection class creates a socket in it's constructor, /// and nowhere else. It cannot be reused. If you want to open another connection, create /// another connection object. /// /// You must supply a callback for reads. Whenever data is available, you will receive /// this callback. For simplicity and efficiency the callback includes an array of /// bytes and a size. The data always starts with byte 0. /// /// The read callback can come from any thread. Presumably it will not be the same thread /// that created the object. /// /// The connection will not read any more data until the read callback returns. The buffer /// will be reused as soon as the read callback returns. (I assume C# is passing that /// by reference!) /// /// You must supply a callback for a broken connection. /// /// You cannot cancel the callbacks. However, you can set either of them to null in the /// constructor. Also, each callback includes a pointer to the connection object. This /// allows you to compare the connection making the callback to the connection you are /// currently using. You can then ignore any results from old connections. /// /// You can send data from any thread. Ultimately, we always send a stream of bytes. /// You can also send a List of byte []. This makes it convenient for the caller to accumulate /// the output, and send it all at once. Also, all items in the list are sent consecutively, /// even if multiple threads are all calling send() at once. /// /// You can disconnect the socket from any thread. After the socket has been disconnected /// (from either side) future attempts to disconnect or to send data will be ignored. /// /// There is no way to ask for the status of a socket from another thread. I thought this /// would add more confusion than anything else because the status can change asynchronously. /// Internally, we have a status variable, which is protected by a lock, but the lock is /// not accessible to external users. An external user can provide a ClosedDelegate to get /// some idea of the status. /// /// Currently there are no error messages. It would make some sense to have a read only /// field reporting the reason for the disconnect. This might say something like "Could /// not find IP address for ...", "Connection timed out", "Connection closed by other side", /// etc. /// To test the Proxy settings, we need to install a proxy server. /// We were able to use CCProxy for this testing. You can download from: /// http://download.cnet.com/CC-Proxy-Server/3000-2155_4-10062250.html /// This proxy server software appears to be compatible to Windows 7/Vista/XP. /// During our testing the proxy server was installed on a laptop running Vista. /// CCProxy allows the user to create user accounts requiring username/password /// authentication (demo version allows for 3 users) under the Accounts settings. /// CCProxy supports SOCKS 4 and 5 services. The default port is 1080 but the /// user can change this under Options. Remember that authentication via the /// SOCKS protocol requires SOCKS 5 to be selected in TIPro. /// To test TIPro set the following parameters in Options - Connections: /// Connection: Primary /// Proxy Type: Socks 5 /// Host: (IP address or name of computer running the proxy server) /// Port: (1080 or another available port) /// Username/Password: (Same as that defined in CCProxy user account) /// public class TcpIpConnection : IConnection { private enum Status { created, connectionStarted, connected, disconnected }; private volatile Status _status = Status.created; private readonly ConnectionCallbacks _connectionCallbacks; private string _address; private int _port; /// /// Create a new connection /// /// /// /// public TcpIpConnection(String address, int port, ConnectionCallbacks connectionCallbacks) { // This makes things simpler, and I can't imagine any reason we should allow a null here. Debug.Assert(null != connectionCallbacks); _connectionCallbacks = connectionCallbacks; _address = address; _port = port; _status = Status.created; } /// /// Disconnect /// public void Disconnect() { lock(_sendQueue) { Socket s = _s; // Notice that we check the socket, not the status. It's possible that someone will try to // disconnect while the connection routine is still running. The last thing the connection routine // does is check for a disconnect, and possibly call this routine again. if (s != null) { _s = null; if (s.Connected) { try { s.Shutdown(SocketShutdown.Both); } catch (ObjectDisposedException) { // Strange. I get an exception here if the program is shutting down when Disconnect() is called. // if (s.Connected) was still true. I don't know how else to check for this case. } s.Close(); } } _status = Status.disconnected; } SendClose(); } /// /// Disconnect... /// ~TcpIpConnection() { Disconnect(); } Socket _s; /// /// This was separated from the constructor so you'd have time to store this object in a variable. /// public void Connect() { lock (_sendQueue) { if (_status != Status.created) throw new ApplicationException("Only call connect once!"); _status = Status.connectionStarted; ThreadPool.QueueUserWorkItem(new WaitCallback(ConnectImpl)); } } private string _lastError; private static IPAddress MakeIPAddress(string ipString) { try { // Updated to use TryParse to check for numeric address if (IPAddress.TryParse(ipString, out var iPAddress)) { return iPAddress; } else { return Dns.GetHostEntry(ipString).AddressList[0]; } } catch { return Dns.GetHostEntry(ipString).AddressList[0]; } } private void ConnectImpl(Object unused) { //_lastReportTime = DateTime.Now; Socket s = null; try { s = CreateSocket(); Connect(s, _address, _port); _connectionCallbacks.OnConnected(this); } catch (Exception ex) { // Ideally we'd report some sort of status. s = null; _lastError = ex.ToString(); _connectionCallbacks.OnError(ex); } if (s == null) // The socket is already dead, but report the close in the normal way. Disconnect(); else { lock (_sendQueue) { _s = s; _status = Status.connected; } StartListening(); StartSend(); } } private static void Connect(Socket s, string address, int port) { ProxySocket proxySocket = s as ProxySocket; if (null != proxySocket) // ProxySocket.Connect does not override Socket.Connect. It is a "new" method. // It appears that everything but connection makes use of the socket interface, // but connection must be done like this. // // Send the address as a string. Let the proxy server resolve that. It might // be an internal address that the client can't resolve. Also, the firewall // at Bloomberg doesn't let us do a DNS lookup. proxySocket.Connect(address, port); else { IPEndPoint IPHost = new IPEndPoint(MakeIPAddress(address), port); s.Connect(IPHost); // This causes an exception if the server app is not running! } } private static Socket CreateSocket() { try { //SocksAddress = "127.0.0.1"; //SocksPort = "8180"; if ((SocksVersion != ProxyTypes.None) && (null != SocksAddress) && (null != SocksPort) && (SocksAddress != "")) { ProxySocket result = new ProxySocket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); result.ProxyEndPoint = new IPEndPoint(MakeIPAddress(SocksAddress), Int32.Parse(SocksPort)); result.ProxyUser = SocksUsername ?? ""; result.ProxyPass = SocksPassword ?? ""; result.ProxyType = SocksVersion; return result; } } catch { } return new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); } public static string SocksUsername; public static string SocksPassword; public static string SocksAddress; public static string SocksPort; public static ProxyTypes SocksVersion = ProxyTypes.Socks5; /// /// Set SOCKS proxy version /// /// Socks version public static void SetSocksVersion(string version) { if (version == "4") SocksVersion = ProxyTypes.Socks4; else if (version == "5") SocksVersion = ProxyTypes.Socks5; } // _sendQueue doubles as a mutex. private List _sendQueue = new List(); private bool _sendInProgress = false; /// /// Feel free to call Send() at any time. If the socket is not connected, output will be queued up. If the /// socket has already been disconnected, the results will be thrown away. /// /// public void Send(String message) { Send(Encoding.ASCII.GetBytes(message)); } /// /// Feel free to call Send() at any time. If the socket is not connected, output will be queued up. If the /// socket has already been disconnected, the results will be thrown away. /// /// public void Send(List messages) { lock (_sendQueue) { _sendQueue.AddRange(messages); } StartSend(); } /// /// Feel free to call Send() at any time. If the socket is not connected, output will be queued up. If the /// socket has already been disconnected, the results will be thrown away. /// /// public void Send(byte[] message) { lock(_sendQueue) { _sendQueue.Add(message); } StartSend(); } //private static List _debugSent = new List(); private int _debugCount; private void StartSend() { lock(_sendQueue) { try { if (_status == Status.disconnected) _sendQueue.Clear(); else if ((_status == Status.connected) && (!_sendInProgress) && (_sendQueue.Count > 0)) { // I'm very unsure of the performace of this. // Seems like there should be an easier way to do this. int count = 0; foreach (byte[] current in _sendQueue) { count += current.Length; } byte[] toSend = new byte[count]; int offset = 0; foreach (byte[] current in _sendQueue) { Array.Copy(current, 0, toSend, offset, current.Length); offset += current.Length; } //lock (_debugSent) //{ // _debugSent.Add(toSend); //} _sendQueue.Clear(); _sendInProgress = true; // Note: BeginSend() might immediately call the callback in the same thread. And the // callback calls StartSend(). So this code is reentrant, and the lock doesn't // prevent that. By clearing the queue before calling BeginSend() we ensure that each // item is only sent once. _s.BeginSend(toSend, 0, toSend.Length, 0, new AsyncCallback(AsyncSendCallback), null); } } catch { _debugCount++; // Ignore this case for simplicity. Presumably we could get here if the socket was closed // before we tried to write to it. We might or might not catch that and change the state // elsewhere, before this method started. I'm not 100% about those strange cases. It's // tempting to ignore SocketException, and report anything and everything else as a // possible programming error. } } } private void AsyncSendCallback(IAsyncResult ar) { try { Socket s = _s; if (s == null || !s.Connected) return; s.EndSend(ar); lock(_sendQueue) { _sendInProgress = false; } StartSend(); } catch { } } private byte[] _readBuffer = new byte[2048]; //private int _bytesReadRecently = 0; //private DateTime _lastReportTime; private void StartListening() { //Console.WriteLine("StartListening()"); if (null != _s) { try { _s.BeginReceive(_readBuffer, 0, _readBuffer.Length, 0, new AsyncCallback(AsyncReadCallback), null); } catch { } } } private void AsyncReadCallback(IAsyncResult ar) { //Console.WriteLine("AsyncReadCallback()"); try { Socket s = _s; if (s == null || !s.Connected) return; int read = s.EndReceive(ar); //Console.WriteLine("read = " + read); if (read > 0) { //_bytesReadRecently += read; //DateTime now = DateTime.Now; //double secondsPassed = (now - _lastReportTime).TotalSeconds; //if ((secondsPassed >= 1.0) || (_bytesReadRecently >= 50000)) //{ // StringBuilder sb = new StringBuilder(); // sb.Append("Read "); // sb.Append(_bytesReadRecently); // sb.Append(" bytes in "); // sb.Append(secondsPassed); // sb.Append(" seconds."); // if (secondsPassed > 0) // { // sb.Append(" "); // double bytesPerSecond = _bytesReadRecently / secondsPassed; // if (bytesPerSecond < 1024) // { // sb.Append(bytesPerSecond); // sb.Append(" Bytes/Sec"); // } // else if (bytesPerSecond < 1024 * 1024) // { // sb.Append(bytesPerSecond / 1024); // sb.Append(" KBytes/Sec"); // } // else // { // sb.Append(bytesPerSecond / 1024 / 1024); // sb.Append(" MBytes/Sec"); // } // } // _lastReportTime = now; // _bytesReadRecently = 0; // System.Diagnostics.Debug.WriteLine(sb); //} try { _connectionCallbacks.OnRead(_readBuffer, read, this); } catch { } StartListening(); } else Disconnect(); } catch { } } private bool _closeSent = false; private void SendClose() { bool needToSend = false; lock(_sendQueue) { needToSend = !_closeSent; _closeSent = true; } if (needToSend) try { _connectionCallbacks.OnClosed(this); } catch { } } } }