using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net.WebSockets; using System.Text; using System.Threading; using System.Threading.Tasks; /* This new preferred connection method. * This is much more efficient than our older HTTPS proxy. * It's almost as efficient as the TCP/IP mode. * This works over HTTPS so it goes through most firewalls. * This works over HTTP or HTTPS so you can add encryption. * * Current status: * Seems to be working! * I need to investigate at least one strange exception. It only happened once. * System.Net.WebSockets.WebSocketException (0x80004005): An internal WebSocket error occurred. Please see the innerException, if present, for more details. ---> System.Net.Sockets.SocketException (0x80004005): The I/O operation has been aborted because of either a thread exit or an application request * This came from await ClientWebSocket.ReceiveAsync() * * How to use right now: * Download the server from git and RUN IT YOURSELF. See ti-git:user-defined/http-proxy-server * Add this to the section of your Custom4.xml file. * * * Future plans: * Set up a permanant server for this. * Test more. * Add to the XML file as an option. * Test more. * Make it the default. * Test more. * Remove the HTTP mode completely. * Remove the TCP/IP mode from the customer view, but it's allowed in Custom4.xml, for debugging. * * Issue -- We keep disconnecting and reconnecting. This is what I see in the server logs. * 💔 shutDown(Graceful close from server., 1000) * /streaming/proxy.rb 2021-05-12T00:36:02.864Z * opened new web socket /streaming/proxy.rb dest=micro_proxy * Sometimes I see this every minute or so. The client automatically reconnects. If I wasn't * watching the logs closely, I might have missed it. * Sometimes I see this a bunch of times in a row, repeating twice a second. The client shows * lots of yellow tick marks. That will last for a minute or so, before it goes back to * green. Restarting the server does *not* help. Restarting the client when it was yellow * typically makes it start green. * Sometimes when I first start the client I never get any green tick marks. The yellow tick * marks seem to contiue forever. When I restart the client it works better. * This mostly suggests a problem on the client side, but the server log make that less certain. * This might or might not be related to the System.Net.Sockets.SocketException described above. * It seems like I'm seeing that more now. */ namespace TradeIdeas.ServerConnection { class WebSocketConnection : IConnection { private readonly ClientWebSocket ClientWebSocket = new ClientWebSocket(); private readonly CancellationTokenSource CancellationTokenSource = new CancellationTokenSource(); private readonly HttpOptions options; private void ShowThreadId(String msg) { Thread currentThread = Thread.CurrentThread; Debug.WriteLine(msg + " ✦ thread id=" + currentThread.ManagedThreadId); // currentThread.Name was null every time I checked. /* Interesting: * When connecting we go to sleep in one thread and wake up in another. * We get two calls to Send() before the connect finishes. * We get calls to Send() from at least two different threads. * * in WebSocketConnection.Connect() before first await ✦ thread id=3 * ✹✹✹ BEFORE * in WebSocketConnection.Send() before await ✦ thread id=3 * in WebSocketConnection.Send() before await ✦ thread id=3 * ✹✹✹ AFTER * in WebSocketConnection.Connect() top of loop ✦ thread id=9 * in WebSocketConnection.Send() before await ✦ thread id=7 * Exception thrown: 'System.Net.WebSockets.WebSocketException' in mscorlib.dll * * Send() typically goes to sleep and wakes up in the same thread, but not always. * in WebSocketConnection.Send() before await ✦ thread id=7 * in WebSocketConnection.Send() after await ✦ thread id=7 * in WebSocketConnection.Send() before await ✦ thread id=7 * in WebSocketConnection.Send() after await ✦ thread id=13 */ } public WebSocketConnection(HttpOptions options) { this.options = options; Debug.Assert((null != options.InitialUrl) && ("" != options.InitialUrl) && (null != options.ConnectionCallbacks)); ShowThreadId("in WebSocketConnection.WebSocketConnection()"); } private Task InitialConnection; public async void Connect() { //ShowThreadId("in WebSocketConnection.Connect() before first await"); try { Uri uri = new Uri(options.InitialUrl); //Debug.WriteLine("✹✹✹ BEFORE"); InitialConnection = ClientWebSocket.ConnectAsync(uri, CancellationTokenSource.Token); await InitialConnection; if (ClientWebSocket.State == WebSocketState.Open) options.ConnectionCallbacks.OnConnected(this); //Debug.WriteLine("✹✹✹ AFTER"); while (ClientWebSocket.State < WebSocketState.CloseSent) // && ! DisconnectStarted { //ShowThreadId("in WebSocketConnection.Connect() top of loop"); byte[] buffer = new byte[8000]; ArraySegment arraySegment = new ArraySegment(buffer); WebSocketReceiveResult webSocketReceiveResult = await ClientWebSocket.ReceiveAsync(arraySegment, CancellationTokenSource.Token).ConfigureAwait(false); if (webSocketReceiveResult.Count < 1) Debug.WriteLine("*** webSocketReceiveResult.Count=" + webSocketReceiveResult.Count); if ((webSocketReceiveResult.MessageType != WebSocketMessageType.Text) && (webSocketReceiveResult.MessageType != WebSocketMessageType.Binary)) Debug.WriteLine("*** webSocketReceiveResult.MessageType=" + webSocketReceiveResult.MessageType); if (webSocketReceiveResult.CloseStatus.HasValue) Debug.WriteLine("*** webSocketReceiveResult.CloseStatus=" + webSocketReceiveResult.CloseStatus.Value); if (webSocketReceiveResult.CloseStatusDescription != null) Debug.WriteLine("*** webSocketReceiveResult.CloseStatusDescription=" + webSocketReceiveResult.CloseStatusDescription); if (webSocketReceiveResult.Count < 1) Debug.WriteLine("*** webSocketReceiveResult.Count=" + webSocketReceiveResult.Count); else options.ConnectionCallbacks.OnRead(buffer, webSocketReceiveResult.Count, this); /* This is what it looks like when I use http://127.0.0.1:9000/static/status.html to force the server to close the connection: * *** webSocketReceiveResult.MessageType=Close * *** webSocketReceiveResult.CloseStatus=4001 * *** webSocketReceiveResult.CloseStatusDescription=From status.ts * *** webSocketReceiveResult.Count=0 * in WebSocketConnection.Connect() before Disconnect() ✦ thread id=9 * This works more or less as expected. */ } } catch (OperationCanceledException) { // TODO this should probably not print anything. It's normal. Debug.WriteLine("WebSock read canceled."); } catch (Exception ex) { Debug.WriteLine(ex); options.ConnectionCallbacks.OnError(ex); } ShowThreadId("in WebSocketConnection.Connect() before Disconnect()"); Disconnect(); } private readonly Object Mutex = new Object(); /// /// This might be null! See DisconnectStarted. /// You should only access this while holding the mutex. /// private List OutgoingQueue = new List(); /// /// You can only make one call at a time to ClientWebSocket.SendAsync(). /// If you want to send output and this is true, throw your data in the queue and that task will eventually get to it. /// If this is false, you will have to start a new task. /// You should only access this while holding the mutex. /// private bool OutputTaskIsRunning = false; /// /// This will only change while someone is holding the mutex. /// It is safe to read this at any time, regardless of the mutex. /// This can only change from false to true. /// private bool DisconnectStarted { get { return OutgoingQueue == null; } } public void Disconnect() { //ShowThreadId("in WebSocketConnection.Disconnect()"); lock (Mutex) { if (DisconnectStarted) return; OutgoingQueue = null; //Assert(DisconnectStarted); } try { CancellationTokenSource.Cancel(); ClientWebSocket.Abort(); } catch (Exception ex) { Debug.WriteLine(ex); } try { options.ConnectionCallbacks.OnClosed(this); } catch { } } private static T[] ConcatArrays(List arrays) { // Based on https://stackoverflow.com/a/55578088/971955 if (arrays == null) throw new ArgumentNullException(); if (arrays.Count == 1) // Optimization. If there was only one, just copy it. We assume these arrays are read only. return arrays[0]; var offset = 0; var newLength = arrays.Sum(arr => arr.Length); var newArray = new T[newLength]; foreach (var arr in arrays) { Buffer.BlockCopy(arr, 0, newArray, offset, arr.Length); offset += arr.Length; } return newArray; } public void Send(List messages) { Send(ConcatArrays(messages)); } /// /// Use Interlocked.Increment() to access this. /// int lastSendId = 0; // TO SEND: // Grab the mutex // If we are going down, just return. // If a send is in progress, push new data to the end of the queue // Else mark in the object that a send is in progress, and mark locally that you need to start a send immediately. // release the mutex // If we need to start a new send: // while true // Write the current data to the websocket. // await completion. // grab the mutex. // If we are going down // return. // else If there is no more work to do // clear the send in progress flag // break; // else // takes the current contents from the queue // release the mutex // end while public async void Send(byte[] message) { int sendId = Interlocked.Increment(ref lastSendId); string debugMessage = "in WebSocketConnection.Send(#" + sendId + ", [" + string.Join(", ", message) + "])"; //ShowThreadId("in WebSocketConnection.Send(#" + sendId + ") before await"); lock (Mutex) { if (null == OutgoingQueue) // This object is shutting down. Ignore any new requests. return; if (OutputTaskIsRunning) { // Someone will get to this soon. OutgoingQueue.Add(message); //ShowThreadId("in WebSocketConnection.Send(#" + sendId + ") adding to queue"); ShowThreadId(debugMessage + " adding to queue"); return; } OutputTaskIsRunning = true; } // ShowThreadId(debugMessage + " starting immediately"); try { await InitialConnection; while (true) { await ClientWebSocket.SendAsync(new ArraySegment(message), WebSocketMessageType.Binary, true, CancellationTokenSource.Token); //ShowThreadId("in WebSocketConnection.Send(#" + sendId + ") after await"); lock (Mutex) { if (null == OutgoingQueue) // This object is shutting down. Ignore any new requests. return; if (OutgoingQueue.Count == 0) { OutputTaskIsRunning = false; //ShowThreadId("in WebSocketConnection.Send(#" + sendId + ") done"); return; } // Grab all the data in the queue and try to process it all at once. Each message is just // a bunch of bytes. Any boundaries between one array and the next are totlly arbitrary and // do not need to be preserved. message = ConcatArrays(OutgoingQueue); OutgoingQueue.Clear(); } } } catch (Exception ex) { Debug.WriteLine(ex); Disconnect(); } } } }