using System; using System.Collections.Generic; using System.Text; using System.Threading; using zlib; using TradeIdeas.MiscSupport; //using ComponentAce.Compression.Libs.zlib; using TradeIdeas.Logging; //using System.IO; using System.Diagnostics; namespace TradeIdeas.ServerConnection { /// /// The TalkWithServer class communicates with the server. /// /// This unit only receives messages from the main program and sends them to /// the server, and vice versa. It knows nothing about the content. /// This unit is responsible for marshaling and unmarshaling the messages. It /// takes in messages in the form of Dictionary(String, byte[]) . See /// OnMessageFromServer for the return message format. This unit is also /// responsible for compression. /// /// The outgoing message usually maps human readable strings to human readable /// strings. However, it doesn't have to. The only real limits are /// 1) The key cannot be "". /// 2) The key "message_id" is reserved, and is set by this class. /// 3) Keys cannot be duplicated. /// We accept the key as a string because, in practice, it's always a simple /// string. We use the ASCII encoding on it. The data is usually a simple /// string, but not in the proxy unit. That unit encodes one or more fields /// in a record as raw bytes. (I.e. a long is encoded as 8 bytes, least /// significant byte first.) This format was chosen because it was very /// fast in C++ and Pascal. We could just use a type cast to convert a /// record into an array of bytes. The C# version is able to talk in this /// protocol, although I'm not sure how efficient the code is! /// /// This automatically combines multiple messages going from the client to the /// server for the sake of efficiency. See FlushCompressionBufferSoon() for /// more info. The algorithm is slightly different from the Delphi version, /// because we use threads in a completely different way. /// /// Notes on threading: The callbacks from this object can come from any /// thread. In the case of certain errors, the callback will come immediately, /// in the same thread, before the function returns. /// /// Callbacks initiating from the server are serialized. You will not get more /// than one callback at a time. If the callback takes a long time to complete, /// that will slow down the other callbacks, and other aspects of this object. /// /// This object is thread safe. All public methods can be called from any /// thread. When you get a callback no mutexes are active. /// public class TalkWithServer : ConnectionCallbacks { // Interesting. It seems like cscConnected is never used. The object is // initialized with the default value _status = scsNew. It eventually // changes to scsDisconnected. So the documentation below is wrong. /// ///For simplicity you can never reuse a connection. When a /// connection fails you can throw it away and start with a new /// one. That way it is perfectly clear when all the details /// are reset. /// /// /// public enum Status { /// /// Not initialized yet. /// scsNew, /// /// Data is flowing /// scsConnected, /// /// No longer can be used. /// scsDisconnected } /// /// There are several callbacks. Any of them could happen in any thread. /// We group them all in a single interface so that we can easily set /// them in the constructor, and never change them again. Changing a /// callback at other times would be complicated with the multiple /// threads. /// /// public interface IListener { /// ///These are events that are specific to the entire connection. /// Individual replies from the server go directly to whoever requested /// them and use the delegate below. /// /// void OnDisconnected(String errorMessage); /// /// An error was worth reporting to the user. Aside from reporting, /// we are taking care of this ourselves. Traditionally this only came /// from the HTTP connection. /// /// void OnAutoRetry(String errorMessage); /// /// body is null on error. /// /// void OnMessagePreview(PreviewArgs args); } private static int _errorCounter; private static int _warningCounter; private static int _sessionId=0; private const int MAX_ERRORS_TO_LOG = 20; private const int MAX_WARNINGS_TO_LOG = 20; /// /// Used by . /// public struct PreviewArgs { /// /// Null on error. Otherwise it's what we got from the server. /// public Byte[] body; /// /// This is only meaningful if originalRequest is not null. /// public DateTime originalTimestamp; /// /// This might be null. /// public Dictionary originalRequest; public static PreviewArgs Empty { get { // Ideally the timestamp and the originalRequest would // always be filled in. But that would require some changes // to the way we do the abort code. And at the moment I'm // not worried about the abort code. PreviewArgs result; result.body = null; result.originalTimestamp = DateTime.MinValue; result.originalRequest = null; return result; } } } /// /// Create one of these if you want to call SendMessage() and possibly /// call Cancel() later. This is not necessary if you never plan to /// call Cancel(). /// /// Do not try to reuse one of these objects! /// public class CancelToken { bool _cancelled; private class Info { private readonly TalkWithServer _talkWithServer; private readonly Int64 _id; public void Cancel() { _talkWithServer.CancelMessage(_id); } public Info(TalkWithServer talkWithServer, Int64 id) { _talkWithServer = talkWithServer; _id = id; } } private Info _info; /// /// If you called TalkWithServer.SendMessage() directly, you can call /// this Cancel() method to cancel any further callbacks. If you /// used SendManager.SendMessage(), then you should use /// SendManager.Cancel() to cancel the callbacks. /// public void Cancel() { _cancelled = true; Thread.MemoryBarrier(); if (null != _info) _info.Cancel(); } /// /// This is aimed at TalkWithServer.SendMessage(). /// /// To cancel a request you need certain information, specifically the /// TalkWithServer object and the message id number. However, the /// message id number isn't known until the middle of SendMessage(). /// So SendMessage() calls this. /// /// Note that this might immediately call TalkWithServer.Cancel(). /// /// This is thread safe. /// /// The object that manages this message. /// The id number generated by TalkWithServer, send to the server, /// and sent back with the reply. internal void Attach(TalkWithServer talkWithServer, Int64 id) { System.Diagnostics.Debug.Assert(null == _info, "Cannot reuse a TalkWithServer.CancelToken."); _info = new Info(talkWithServer, id); Thread.MemoryBarrier(); if (_cancelled) _info.Cancel(); } /// /// True if the message has been canceled. This can change at any time from any thread. /// public bool IsCanceled { get { return _cancelled; } } } /// /// /// /// This is the response from the server to an individual request. /// Body is null on error. /// /// /// Whatever the client specified when he send the request. /// public delegate void Response(byte[] body, object clientId); public static Dictionary CreateMessage(params Object[] items) { return CreateMessageFromArray(items); } private static readonly byte[] TRUE_AS_BYTES = Encoding.ASCII.GetBytes("1"); private static readonly byte[] FALSE_AS_BYTES = Encoding.ASCII.GetBytes("0"); public static Dictionary CreateMessageFromArray(Object[] items) { if ((items.Length % 2) != 0) return null; Dictionary result = new Dictionary(); for (int i = 0; i < items.Length; i += 2) { string name = (string)items[i]; object value = items[i + 1]; if ((name == null) || (value == null)) // We allow this as a convenience. Sometimes it's easier // to call this function once, with all possible fields, // even though some fields don't matter. continue; byte[] bytes = value as byte[]; if (bytes == null) { DateTime? asDateTime = value as DateTime?; if (value is DateTime) { long asTimeT = ServerFormats.ToTimeT((DateTime)value); string asString = asTimeT.ToString(); bytes = Encoding.ASCII.GetBytes(asString); } else if (value is double) { double asDouble = (double)value; string asString = ServerFormats.ToString(asDouble); bytes = Encoding.ASCII.GetBytes(asString); } else if (value is bool) bytes = ((bool)value) ? TRUE_AS_BYTES : FALSE_AS_BYTES; else bytes = Encoding.UTF8.GetBytes(value.ToString()); } result[name] = bytes; } return result; } // This object will be used any time we want to lock the entire object. // // I looked at more precise locks. // For simplicity I eventually decided to have only one lock. // // We could always lock "this". That was considered bad style in some // places. It was better to lock a private object. // // The actual object in this variable was chosen somewhat arbitrarily. // It was just an object that was private and not expected to be locked // by anyone else. private Object _lock; private IConnection _connection; private void ConnectionDown() { // IReadOnlyCollection would be ideal here. But that's not available until // .NET 4.5. A lot of our code uses .NET 4.5, but not this library. // We used to use IEnumerable here. That doesn't have the Count property. Dictionary.ValueCollection outstandingRequests; // We have to use a lock here to keep the _status and the // _outstandingRequests consistent. The rule is that every request will // get a callback when it is fulfilled or when the connection is broken. // When we add a new request, we might see that the connection is down, // and immediately do the callback. We should send exactly one callback, // either from there or from here. lock (_lock) { _status = Status.scsDisconnected; if (_outstandingRequests == null) // Already down! Outstanding requests is already null so we // can't even try to do anything. return; outstandingRequests = _outstandingRequests.Values; _outstandingRequests = null; } System.Diagnostics.Debug.WriteLine(DateTime.Now.ToShortTimeString() + " Connection to proxy broken, all " + outstandingRequests.Count + " pending requests canceled."); // Don't make any callbacks inside of a lock. IListener miscCallbacks = _miscCallbacks; if (miscCallbacks != null) try { miscCallbacks.OnDisconnected(""); // TODO need error message. } catch { } foreach (OutstandingRequest request in outstandingRequests) { if ((request != null) && (request.callback != null)) SendErrorResponse(request.clientId, request.callback, request.streaming); } } void ConnectionCallbacks.OnClosed(IConnection connection) { if( _status == Status.scsConnected) { if (_warningCounter < MAX_WARNINGS_TO_LOG) { TILogger.Warning("Connection Closed"); _warningCounter++; } } // This is a callback from the Connection. ConnectionDown(); } void ConnectionCallbacks.OnRead(byte[] read, int count, IConnection connection) { _decompressor.next_in = read; _decompressor.next_in_index = 0; _decompressor.avail_in = count; if (_decompressor.next_out == null) { _decompressor.next_out = new byte[1024]; _decompressor.avail_out = _decompressor.next_out.Length; } bool outputBufferFull = false; bool error = false; while (((_decompressor.avail_in > 0) || outputBufferFull) && !error) { if (_decompressor.avail_out < 100) { _decompressor.avail_out += _decompressor.next_out.Length; Array.Resize(ref _decompressor.next_out, _decompressor.next_out.Length * 2); } int result = _decompressor.inflate(zlibConst.Z_SYNC_FLUSH); if (!((result == zlibConst.Z_OK) || (outputBufferFull && (result == zlibConst.Z_BUF_ERROR)))) error = true; else { CheckForCompressedMessage(); } } if (error) { if (_warningCounter < MAX_WARNINGS_TO_LOG) { TILogger.Warning(message: "Decompressor Error"); _warningCounter++; } _connection.Disconnect(); } } // Outgoing to server // // We need a lock to access these variables. If two threads try to send // a message at the same time, we need to send one complete message followed // by another. We don't want the bytes of the two messages to be interleaved. private List _outgoingBuffer = new List(); // already compressed, if required. private byte[] _toCompress; // Next thing to give to the compressor. We don't give it up right away because we don't know if we want to flush or not. private ZStream _compressor = new ZStream(); private void SendBytes(byte[] data) { //_debugOut.Add(data); _connection.Send(data); } //private static List _debugOut = new List(); private void CompressAndSendBytes(byte[] data, bool flush) { _compressor.next_in = data; _compressor.next_in_index = 0; _compressor.avail_in = _compressor.next_in.Length; do { _compressor.next_out = new byte[10240]; _compressor.next_out_index = 0; _compressor.avail_out = _compressor.next_out.Length; _compressor.deflate(flush ? zlibConst.Z_SYNC_FLUSH : zlibConst.Z_NO_FLUSH); // Silently ignore any errors here. We don't expect any real // problems here, and we don't want to worry about the special case // where we weren't sure if there was more in the compressor or // not. if (_compressor.next_out_index > 0) { if (_compressor.next_out_index < _compressor.next_out.Length) { Array.Resize(ref _compressor.next_out, _compressor.next_out_index); } // As soon as the compressor spits out data, send it to the // network connection. Let the network connection handle any // further buffering. We do this mostly for simplicity. But // there was also a strange case in the Delphi code where the // client was busy and was constantly writing to a buffer for a // long time, and eventually the server would time out. SendBytes(_compressor.next_out); } } while ((_compressor.avail_in > 0) || (_compressor.avail_out == 0)); } private void CompressAndSendBytes(byte[] data) { if (data.Length == 0) return; if (_toCompress != null) CompressAndSendBytes(_toCompress, false); _toCompress = data; } private void CompressAndSendInteger(int i) { byte[] data = new byte[4]; // Intel byte order. data[0] = (byte)i; data[1] = (byte)(i >> 8); data[2] = (byte)(i >> 16); data[3] = (byte)(i >> 24); CompressAndSendBytes(data); } private void CompressAndSendWithSize(byte[] data) { CompressAndSendInteger(data.Length); CompressAndSendBytes(data); } private void CompressAndSendWithSize(String data) { CompressAndSendWithSize(Encoding.ASCII.GetBytes(data)); } private void FlushCompressionBuffer() { if (_toCompress != null) { CompressAndSendBytes(_toCompress, true); _toCompress = null; } } // Attempt to defer the flush until after multiple messages have been added. That will // make things more efficient. If we can group multiple messages before a flush, we // will send fewer bytes and fewer packets. // // This will never work quite as well as the Delphi version. That relied on the fact // that only the GUI thread could send messages. All messages sent from one event (in // the GUI message pump) were serviced by a single flush. This version will actually // be more efficient on a machine with one CPU than a machine with lots of CPUs. // // I looked at some alternative ways to do this. If the worker thread were responsible // for marshalling the message, not just flushing the compression buffer, that would // slow down the rate at which we flushed messages, and speed up the thread that // generates messages. That would possibly give us more messages per flush. Also, if // there are multiple connections, each one would do a lot of work in it's own thread. // So if some connections demanded more work than others, this might spread the work // out more fairly. // // A better answer might be to create a single thread which is responsible for all // reading and writing. Anything that is currently protected by the _lock would only // be accessible from that thread. Any attempt to send a message, disconnect, or // receive bytes from the server would be put into a queue. We'd only flush when the // queue was empty. That would allow us to accumulate a large number of messages. private volatile bool _flushIsPending = false; private void FlushCompressionBufferSoon() { lock (_lock) { if (!_flushIsPending) { _flushIsPending = true; ThreadPool.QueueUserWorkItem(new WaitCallback(FlushCompressionBufferCallback)); } } } private void FlushCompressionBufferCallback(object state) { Thread.Sleep(0); lock (_lock) { FlushCompressionBuffer(); _flushIsPending = false; } } // Incoming from server. private ZStream _decompressor = new ZStream(); private static int UnmarshalInt(byte[] data, int start) { int result = (int)(data[start]); result += ((int)(data[start + 1])) << 8; result += ((int)(data[start + 2])) << 16; result += ((int)(data[start + 3])) << 24; return result; } private static Int64 UnmarshalInt64(byte[] data, int start) { Int64 result = 0; start += 7; for (int i = 0; i < 8; i++) { result <<= 8; result += data[start]; start--; } return result; } private void CheckForCompressedMessage() { // This will unmarshal as many complete messages as we can find // and send them all to the client. // // Integers are sent as four bytes in the standard Intel byte order. // Each message starts with a message id. This came from the message_id // field that we sent to the server. This is sent as an integer. After // that we receive the length of the message. That is also an integer. // Length describes the number of bytes. Finally we receive the body of // the message. This can be any string of bytes. Most often it is XML, // but sometimes it contains binary data, including gif files. } // // This code is not reentrant, nor is is protected by locks. We assume // that the code which sends us new data (the Connection class) will not // send us more data until we finish the callback from the last data. int consumed = 0; while (true) { if (_decompressor.next_out_index - consumed < 12) break; Int64 id = UnmarshalInt64(_decompressor.next_out, consumed); int size = UnmarshalInt(_decompressor.next_out, consumed + 8); int totalSize = size + 12; if (_decompressor.next_out_index - consumed < totalSize) break; byte[] body = new byte[size]; Array.Copy(_decompressor.next_out, consumed + 12, body, 0, size); SendGoodResponse(id, body); consumed += totalSize; } if (consumed > 0) { Array.Copy(_decompressor.next_out, consumed, _decompressor.next_out, 0, _decompressor.next_out_index - consumed); _decompressor.next_out_index -= consumed; _decompressor.avail_out += consumed; } } // Status to client. private Status _status; /// /// Warning: This can change at any time. /// private IListener _miscCallbacks; private class OutstandingRequest { public Response callback; public bool streaming; public object clientId; public readonly DateTime timestamp = DateTime.Now; public Dictionary originalRequest; } // _outstandRequests must be protected by the lock. private Dictionary _outstandingRequests = new Dictionary(); private void SendErrorResponse(object clientId, Response callback, bool streaming) { // We don't display the error for streaming messages. This is a little // ugly. We're trying to solve a specific problem. The server can // disconnect us in a timeout. This happens when we are already stopped, // and not really expecting data. In this case we don't want to see a // yellow tickmark because that implies that there was a problem and we // are going to retry. Yuck! IListener miscCallbacks = _miscCallbacks; if ((miscCallbacks != null) && (!streaming)) try { miscCallbacks.OnMessagePreview(PreviewArgs.Empty); } catch { } if (callback != null) try { callback(null, clientId); } catch (Exception e) { string debugView = e.Message + e.StackTrace; } } private readonly char[] NEWLINE_ONLY = new char[] { '\n' }; private void OneServerDisconnected(byte[] body) { string disconnectMessage = BytesToStr(body); //System.Diagnostics.Debug.Write(DateTime.Now.ToLongTimeString() + ": OneServerDisconnected(" + disconnectMessage + ")"); string[] pieces = disconnectMessage.Split(NEWLINE_ONLY); if (pieces.Length < 2) { System.Diagnostics.Debug.Write("Invalid server disconnected message. Not enough entries. " + disconnectMessage); return; } Int64 firstId; if (!Int64.TryParse(pieces[0], out firstId)) { System.Diagnostics.Debug.Write("Invalid server disconnected message. Can't parse first id. " + disconnectMessage); return; } Int64 lastId; if (!Int64.TryParse(pieces[1], out lastId)) { System.Diagnostics.Debug.Write("Invalid server disconnected message. Can't parse last id. " + disconnectMessage); return; } HashSet commandNames = new HashSet(); for (int i = 2; i < pieces.Length; i++) { string next = pieces[i]; if (next != "") commandNames.Add(next); } SortedDictionary toReport = new SortedDictionary(); lock (_lock) { if (null == _outstandingRequests) return; foreach (var kvp in _outstandingRequests) { if ((kvp.Key >= firstId) && (kvp.Key <= lastId)) { byte[] command; if (kvp.Value.originalRequest.TryGetValue("command", out command)) if (commandNames.Contains(BytesToStr(command))) toReport[kvp.Key] = kvp.Value; } } foreach (var kvp in toReport) _outstandingRequests.Remove(kvp.Key); } StringBuilder debugMessage = new StringBuilder(); debugMessage.Append(DateTime.Now.ToShortTimeString()); debugMessage.Append(" One server disconnected. First id = "); debugMessage.Append(firstId); debugMessage.Append(", last id = "); debugMessage.Append(lastId); debugMessage.Append(", found "); debugMessage.Append(toReport.Count); debugMessage.Append(" messages, commands = ("); bool first = true; foreach (string command in commandNames) { if (first) first = false; else debugMessage.Append(", "); debugMessage.Append(command); } debugMessage.Append(")."); System.Diagnostics.Debug.WriteLine(debugMessage.ToString()); foreach (var kvp in toReport) SendErrorResponse(kvp.Value.clientId, kvp.Value.callback, kvp.Value.streaming); } private void SendGoodResponse(Int64 msgId, byte[] body) { //File.AppendAllText("WebSocketGatewayResponses.txt", $"{DateTime.Now:HH:mm.ss.fff}{Environment.NewLine}{Encoding.UTF8.GetString(body)}{Environment.NewLine}{Environment.NewLine}"); Debug.WriteLine($"{DateTime.Now:HH:mm.ss.fff}{Environment.NewLine}{Encoding.UTF8.GetString(body)}{Environment.NewLine}{Environment.NewLine}"); //String bodyAsString = BytesToStr(body); //if (bodyAsString.Length > 50) // bodyAsString = bodyAsString.Substring(0, 49) + '…'; //System.Diagnostics.Debug.WriteLine(msgId + ": " + bodyAsString); if (msgId == -1) { //RA: There are too many of these as indicated in Loggly, will turn off for now and review with Phil and Michael F //if (_warningCounter < MAX_WARNINGS_TO_LOG) //{ // TILogger.Warning(message: $"-1 Message Id Received with Body: {BytesToStr(body)}"); // _warningCounter++; //} OneServerDisconnected(body); return; } OutstandingRequest request = null; lock (_lock) { if (_outstandingRequests == null) // This might be possible. We are trying to shut down, but we // still got some data after that. That shouldn't happen, but // might be an artifact of the multiple threads. return; _outstandingRequests.TryGetValue(msgId, out request); if (request == null) return; if (!request.streaming) _outstandingRequests.Remove(msgId); } // Do the callbacks without holding any locks. We were getting into // trouble with deadlock when we tried to hold the lock during the // callback. At one time (in the early Delphi days) we tried to hold // the lock so we could do a better job canceling requests. if (request.callback != null) { IListener miscCallbacks = _miscCallbacks; if (miscCallbacks != null) try { PreviewArgs args; args.body = body; args.originalTimestamp = request.timestamp; args.originalRequest = request.originalRequest; miscCallbacks.OnMessagePreview(args); if (body == null) { if (_warningCounter < MAX_WARNINGS_TO_LOG) { TILogger.Warning(message: $"MessageId:{msgId}, Null Body"); _warningCounter++; } } } catch { } //DateTime startTime = DateTime.Now; //StringBuilder sb = new StringBuilder(); //sb.Append(startTime); //sb.Append(": command="); //sb.Append(BytesToStr(request.originalRequest["command"])); try { // File.AppendAllText("responsesTcpIp.txt", $"{msgId}:{Encoding.UTF8.GetString(body)}{Environment.NewLine}{Environment.NewLine}"); request.callback(body, request.clientId); } catch (Exception e) { //sb.Append(", exception="); //sb.Append(e); string debugView = e.StackTrace; } //sb.Append(", time="); //sb.Append((DateTime.Now - startTime).Ticks / 10000.0); //sb.Append("ms"); //System.Diagnostics.Debug.WriteLine(sb); } } void ConnectionCallbacks.OnAutoRetry(IConnection source, String msg) { IListener miscCallbacks = _miscCallbacks; if (miscCallbacks != null) try { miscCallbacks.OnAutoRetry(msg); } catch { } } // public /// /// Communicate with server /// /// /// public TalkWithServer(IConnectionFactory connectionFactory, IListener miscCallbacks) { //_debugOut.Add(new byte[]{}); _sessionId++; _lock = _decompressor; _compressor.deflateInit(5); _decompressor.inflateInit(); _miscCallbacks = miscCallbacks; _connection = connectionFactory.CreateConnection(this); TIUserSession.ConnectionType = _connection.GetType().Name; } /// /// This was separated from the constructor so you'd have time to store this object in a variable. /// public void Connect() { _connection.Connect(); SendBytes(Encoding.ASCII.GetBytes("command=set_output&mode=zlib64\r\ncommand=set_input&mode=zlib\r\n")); } private static bool ValidMessage(Dictionary message) { // We skip/ignore certain types of fields. There must be at least // one field that we don't skip or ignore. // We also skip null values to avoid ArgumentNullException. foreach (KeyValuePair pair in message) if ((!String.IsNullOrEmpty(pair.Key)) && (pair.Key != "message_id")) return true; return false; } /// /// Get status of connection /// /// public Status GetStatus() { return _status; } /// /// You must hold the _lock to access this. /// /// At one time we were using Interlocked.Increment(). That version of /// the code had multiple issues. /// /// 0 is always reserved. If we don't except a response from a message, /// we give it an id of 0. If we receive a response with message id /// 0, we always throw it into the bit bucket. /// private static Int64 _lastMessageId = 0; /// /// Call this if you are no longer interested in callbacks for a message. /// /// Note: This does not guarantee that responses will stop immediately. /// A callback might still occur. This is a multithreaded library, and /// the callbacks are not called from a lock. /// /// Note: This does not notify the server. Many server requests have /// corresponding requests to cancel them. /// /// The purpose of this method is to remove unused memory from the /// TalkWithServer object. Each time you send a request with a callback /// that allocates some memory to handle the callback. /// /// Use CancelToken if you need to cancel a message. CancelToken will /// call this function. CancelToken will also take care of other /// details, like pointing to the right TalkWithServer object, and /// managing some multi-threaded issues. /// /// private void CancelMessage(Int64 messageId) { lock (_lock) { if (null != _outstandingRequests) _outstandingRequests.Remove(messageId); } } // On error, this will immediately call the callback. Same thread, even // before returning from this function. Errors can also come back in // another thread, for example if we are disconnected before receiving // the response. /// /// Send a message and optionally expect a stream of responses. If /// streaming is true, we will continue to listen for responses until /// the connection is broken. If response is null, no response is /// expected; if the server sends one, it is ignored. /// /// Send this to the server. /// Callback when we get a response. Can be null if you don't want a response. /// /// True if we're expecting more than one response. Otherwise the callback /// will be removed as soon as we get the first response. This is not used if /// response is null. /// /// /// This is returned as part of the callback. It can be anything. /// This is not used if response is null. /// /// /// This should be null if you have no intention of ever canceling the message. /// Otherwise the CancelToken will be associated with the message. Use the /// CancelToken to cancel the message, when you don't want any more responses. /// Note that you can cancel a message before, during or after this call to /// TalkWithServer.SendMessage(). Sending and canceling are both thread safe. /// This is important because the code that initiates and cancels messages may /// not talk directly with TalkWithServer. More likely that code will send a /// request to SendManager which will put the request in a queue and call /// TalkWithServer.SendMessage() in a different thread. /// This is not used if response is null. /// public void SendMessage(Dictionary message, Response response = null, bool streaming = false, object clientId = null, CancelToken cancelToken = null) { lock (_lock) { if (_outstandingRequests != null && _status != Status.scsDisconnected && ValidMessage(message)) { // Send message. // assert(_outstandingRequests != null) -- Invariant: (_status == disconnected) <==> (_outstandingRequests == null) Int64 messageId = 0; if (response != null) { OutstandingRequest newRequest = new OutstandingRequest(); newRequest.callback = response; newRequest.streaming = streaming; newRequest.clientId = clientId; newRequest.originalRequest = message; messageId = ++_lastMessageId; _outstandingRequests.Add(messageId, newRequest); if (null != cancelToken) { Thread.MemoryBarrier(); // Remember that someone can cancel a message before or during the call // to this function. The following call to Attach() might immediately // try to cancel the message. So we can't call Attach() until after we // are done with _outstandingRequests. cancelToken.Attach(this, messageId); if (cancelToken.IsCanceled) // Perhaps a silly optimization. It's entirely possible that the // message will be canceled in another thread one op code after // this test. return; } } //var mb = new StringBuilder(); //mb.Append($"message_id={messageId}"); //foreach (var item in message) //{ // mb.Append($"&{item.Key}={Encoding.UTF8.GetString(item.Value)}"); //} //mb.Append("\r\n"); //File.AppendAllText("commands.txt", mb.ToString()); //SendBytes(Encoding.UTF8.GetBytes(mb.ToString())); //return; if (messageId != 0) { CompressAndSendWithSize("message_id"); CompressAndSendWithSize(Convert.ToString(messageId)); } foreach (KeyValuePair pair in message) if ((!String.IsNullOrEmpty(pair.Key)) && (pair.Key != "message_id")) { CompressAndSendWithSize(pair.Key); CompressAndSendWithSize(pair.Value); } CompressAndSendInteger(0); FlushCompressionBufferSoon(); // Done! Do not send error report! return; } } // Else, unable to send message, so send an error immediately. SendErrorResponse(clientId, response, streaming); } /// /// Give up the resources. After calling Disconnect() you should not /// use this object any more. /// Turn off the global callbacks. That's consistent with the normal /// usage in the Delphi version. We don't care about the going down /// messages because we're the one who told it to go down. But send /// all of the per-message callbacks. That's part of the contract. /// And the individual listeners do not have any other way to know that /// we went down. /// public void Disconnect() { if (_status == Status.scsConnected) { //RA: We don't need to log this because somewhere in TI Pro //its requesting to Disconnect(), most likely from ConnectionLifeCycle::SoftReset() //if (_warningCounter < MAX_WARNINGS_TO_LOG) //{ // TILogger.Warning(message: "Disconnect called from {}"); // _warningCounter++; //} } _miscCallbacks = null; ConnectionDown(); _connection.Disconnect(); } /// /// Disconnect /// ~TalkWithServer() { Disconnect(); } private static Encoding _utf8WithException = Encoding.GetEncoding("utf-8", new EncoderExceptionFallback(), new DecoderExceptionFallback()); private static Encoding _latin1 = Encoding.GetEncoding("Latin1", new EncoderReplacementFallback(), new DecoderReplacementFallback()); /// ///The server / protocol is a little bit sloppy when it comes to /// encodings. We assume that everything is UTF-8. If that /// fails, then we try latin-1. That's not true for everything, /// but it should be right most of the time, and it won't fail /// to return something. /// /// /// UTF-8 or latin-1 public static String BytesToStr(byte[] bytes) { try { return _utf8WithException.GetString(bytes); } catch { } return _latin1.GetString(bytes); } public void OnConnected(IConnection connection) { _status = Status.scsConnected; } public void OnError(Exception ex) { if (_errorCounter < MAX_ERRORS_TO_LOG) { TILogger.Error(ex.Message, ex); _errorCounter++; } } } }