using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using zlib;
using TradeIdeas.MiscSupport;
//using ComponentAce.Compression.Libs.zlib;
using TradeIdeas.Logging;
//using System.IO;
using System.Diagnostics;
namespace TradeIdeas.ServerConnection
{
///
/// The TalkWithServer class communicates with the server.
///
/// This unit only receives messages from the main program and sends them to
/// the server, and vice versa. It knows nothing about the content.
/// This unit is responsible for marshaling and unmarshaling the messages. It
/// takes in messages in the form of Dictionary(String, byte[]) . See
/// OnMessageFromServer for the return message format. This unit is also
/// responsible for compression.
///
/// The outgoing message usually maps human readable strings to human readable
/// strings. However, it doesn't have to. The only real limits are
/// 1) The key cannot be "".
/// 2) The key "message_id" is reserved, and is set by this class.
/// 3) Keys cannot be duplicated.
/// We accept the key as a string because, in practice, it's always a simple
/// string. We use the ASCII encoding on it. The data is usually a simple
/// string, but not in the proxy unit. That unit encodes one or more fields
/// in a record as raw bytes. (I.e. a long is encoded as 8 bytes, least
/// significant byte first.) This format was chosen because it was very
/// fast in C++ and Pascal. We could just use a type cast to convert a
/// record into an array of bytes. The C# version is able to talk in this
/// protocol, although I'm not sure how efficient the code is!
///
/// This automatically combines multiple messages going from the client to the
/// server for the sake of efficiency. See FlushCompressionBufferSoon() for
/// more info. The algorithm is slightly different from the Delphi version,
/// because we use threads in a completely different way.
///
/// Notes on threading: The callbacks from this object can come from any
/// thread. In the case of certain errors, the callback will come immediately,
/// in the same thread, before the function returns.
///
/// Callbacks initiating from the server are serialized. You will not get more
/// than one callback at a time. If the callback takes a long time to complete,
/// that will slow down the other callbacks, and other aspects of this object.
///
/// This object is thread safe. All public methods can be called from any
/// thread. When you get a callback no mutexes are active.
///
public class TalkWithServer : ConnectionCallbacks
{
// Interesting. It seems like cscConnected is never used. The object is
// initialized with the default value _status = scsNew. It eventually
// changes to scsDisconnected. So the documentation below is wrong.
///
///For simplicity you can never reuse a connection. When a
/// connection fails you can throw it away and start with a new
/// one. That way it is perfectly clear when all the details
/// are reset.
///
///
///
public enum Status
{
///
/// Not initialized yet.
///
scsNew,
///
/// Data is flowing
///
scsConnected,
///
/// No longer can be used.
///
scsDisconnected
}
///
/// There are several callbacks. Any of them could happen in any thread.
/// We group them all in a single interface so that we can easily set
/// them in the constructor, and never change them again. Changing a
/// callback at other times would be complicated with the multiple
/// threads.
///
///
public interface IListener
{
///
///These are events that are specific to the entire connection.
/// Individual replies from the server go directly to whoever requested
/// them and use the delegate below.
///
///
void OnDisconnected(String errorMessage);
///
/// An error was worth reporting to the user. Aside from reporting,
/// we are taking care of this ourselves. Traditionally this only came
/// from the HTTP connection.
///
///
void OnAutoRetry(String errorMessage);
///
/// body is null on error.
///
///
void OnMessagePreview(PreviewArgs args);
}
private static int _errorCounter;
private static int _warningCounter;
private static int _sessionId=0;
private const int MAX_ERRORS_TO_LOG = 20;
private const int MAX_WARNINGS_TO_LOG = 20;
///
/// Used by .
///
public struct PreviewArgs
{
///
/// Null on error. Otherwise it's what we got from the server.
///
public Byte[] body;
///
/// This is only meaningful if originalRequest is not null.
///
public DateTime originalTimestamp;
///
/// This might be null.
///
public Dictionary originalRequest;
public static PreviewArgs Empty
{
get
{ // Ideally the timestamp and the originalRequest would
// always be filled in. But that would require some changes
// to the way we do the abort code. And at the moment I'm
// not worried about the abort code.
PreviewArgs result;
result.body = null;
result.originalTimestamp = DateTime.MinValue;
result.originalRequest = null;
return result;
}
}
}
///
/// Create one of these if you want to call SendMessage() and possibly
/// call Cancel() later. This is not necessary if you never plan to
/// call Cancel().
///
/// Do not try to reuse one of these objects!
///
public class CancelToken
{
bool _cancelled;
private class Info
{
private readonly TalkWithServer _talkWithServer;
private readonly Int64 _id;
public void Cancel()
{
_talkWithServer.CancelMessage(_id);
}
public Info(TalkWithServer talkWithServer, Int64 id)
{
_talkWithServer = talkWithServer;
_id = id;
}
}
private Info _info;
///
/// If you called TalkWithServer.SendMessage() directly, you can call
/// this Cancel() method to cancel any further callbacks. If you
/// used SendManager.SendMessage(), then you should use
/// SendManager.Cancel() to cancel the callbacks.
///
public void Cancel()
{
_cancelled = true;
Thread.MemoryBarrier();
if (null != _info)
_info.Cancel();
}
///
/// This is aimed at TalkWithServer.SendMessage().
///
/// To cancel a request you need certain information, specifically the
/// TalkWithServer object and the message id number. However, the
/// message id number isn't known until the middle of SendMessage().
/// So SendMessage() calls this.
///
/// Note that this might immediately call TalkWithServer.Cancel().
///
/// This is thread safe.
///
/// The object that manages this message.
/// The id number generated by TalkWithServer, send to the server,
/// and sent back with the reply.
internal void Attach(TalkWithServer talkWithServer, Int64 id)
{
System.Diagnostics.Debug.Assert(null == _info, "Cannot reuse a TalkWithServer.CancelToken.");
_info = new Info(talkWithServer, id);
Thread.MemoryBarrier();
if (_cancelled)
_info.Cancel();
}
///
/// True if the message has been canceled. This can change at any time from any thread.
///
public bool IsCanceled { get { return _cancelled; } }
}
///
///
///
/// This is the response from the server to an individual request.
/// Body is null on error.
///
///
/// Whatever the client specified when he send the request.
///
public delegate void Response(byte[] body, object clientId);
public static Dictionary CreateMessage(params Object[] items)
{
return CreateMessageFromArray(items);
}
private static readonly byte[] TRUE_AS_BYTES = Encoding.ASCII.GetBytes("1");
private static readonly byte[] FALSE_AS_BYTES = Encoding.ASCII.GetBytes("0");
public static Dictionary CreateMessageFromArray(Object[] items)
{
if ((items.Length % 2) != 0)
return null;
Dictionary result = new Dictionary();
for (int i = 0; i < items.Length; i += 2)
{
string name = (string)items[i];
object value = items[i + 1];
if ((name == null) || (value == null))
// We allow this as a convenience. Sometimes it's easier
// to call this function once, with all possible fields,
// even though some fields don't matter.
continue;
byte[] bytes = value as byte[];
if (bytes == null)
{
DateTime? asDateTime = value as DateTime?;
if (value is DateTime)
{
long asTimeT = ServerFormats.ToTimeT((DateTime)value);
string asString = asTimeT.ToString();
bytes = Encoding.ASCII.GetBytes(asString);
}
else if (value is double)
{
double asDouble = (double)value;
string asString = ServerFormats.ToString(asDouble);
bytes = Encoding.ASCII.GetBytes(asString);
}
else if (value is bool)
bytes = ((bool)value) ? TRUE_AS_BYTES : FALSE_AS_BYTES;
else
bytes = Encoding.UTF8.GetBytes(value.ToString());
}
result[name] = bytes;
}
return result;
}
// This object will be used any time we want to lock the entire object.
//
// I looked at more precise locks.
// For simplicity I eventually decided to have only one lock.
//
// We could always lock "this". That was considered bad style in some
// places. It was better to lock a private object.
//
// The actual object in this variable was chosen somewhat arbitrarily.
// It was just an object that was private and not expected to be locked
// by anyone else.
private Object _lock;
private IConnection _connection;
private void ConnectionDown()
{
// IReadOnlyCollection would be ideal here. But that's not available until
// .NET 4.5. A lot of our code uses .NET 4.5, but not this library.
// We used to use IEnumerable here. That doesn't have the Count property.
Dictionary.ValueCollection outstandingRequests;
// We have to use a lock here to keep the _status and the
// _outstandingRequests consistent. The rule is that every request will
// get a callback when it is fulfilled or when the connection is broken.
// When we add a new request, we might see that the connection is down,
// and immediately do the callback. We should send exactly one callback,
// either from there or from here.
lock (_lock)
{
_status = Status.scsDisconnected;
if (_outstandingRequests == null)
// Already down! Outstanding requests is already null so we
// can't even try to do anything.
return;
outstandingRequests = _outstandingRequests.Values;
_outstandingRequests = null;
}
System.Diagnostics.Debug.WriteLine(DateTime.Now.ToShortTimeString()
+ " Connection to proxy broken, all " + outstandingRequests.Count
+ " pending requests canceled.");
// Don't make any callbacks inside of a lock.
IListener miscCallbacks = _miscCallbacks;
if (miscCallbacks != null)
try
{
miscCallbacks.OnDisconnected(""); // TODO need error message.
}
catch { }
foreach (OutstandingRequest request in outstandingRequests)
{
if ((request != null) && (request.callback != null))
SendErrorResponse(request.clientId, request.callback,
request.streaming);
}
}
void ConnectionCallbacks.OnClosed(IConnection connection)
{
if( _status == Status.scsConnected)
{
if (_warningCounter < MAX_WARNINGS_TO_LOG)
{
TILogger.Warning("Connection Closed");
_warningCounter++;
}
}
// This is a callback from the Connection.
ConnectionDown();
}
void ConnectionCallbacks.OnRead(byte[] read, int count,
IConnection connection)
{
_decompressor.next_in = read;
_decompressor.next_in_index = 0;
_decompressor.avail_in = count;
if (_decompressor.next_out == null)
{
_decompressor.next_out = new byte[1024];
_decompressor.avail_out = _decompressor.next_out.Length;
}
bool outputBufferFull = false;
bool error = false;
while (((_decompressor.avail_in > 0) || outputBufferFull) && !error)
{
if (_decompressor.avail_out < 100)
{
_decompressor.avail_out += _decompressor.next_out.Length;
Array.Resize(ref _decompressor.next_out,
_decompressor.next_out.Length * 2);
}
int result = _decompressor.inflate(zlibConst.Z_SYNC_FLUSH);
if (!((result == zlibConst.Z_OK) || (outputBufferFull && (result == zlibConst.Z_BUF_ERROR))))
error = true;
else
{
CheckForCompressedMessage();
}
}
if (error)
{
if (_warningCounter < MAX_WARNINGS_TO_LOG)
{
TILogger.Warning(message: "Decompressor Error");
_warningCounter++;
}
_connection.Disconnect();
}
}
// Outgoing to server
//
// We need a lock to access these variables. If two threads try to send
// a message at the same time, we need to send one complete message followed
// by another. We don't want the bytes of the two messages to be interleaved.
private List _outgoingBuffer = new List(); // already compressed, if required.
private byte[] _toCompress; // Next thing to give to the compressor. We don't give it up right away because we don't know if we want to flush or not.
private ZStream _compressor = new ZStream();
private void SendBytes(byte[] data)
{
//_debugOut.Add(data);
_connection.Send(data);
}
//private static List _debugOut = new List();
private void CompressAndSendBytes(byte[] data, bool flush)
{
_compressor.next_in = data;
_compressor.next_in_index = 0;
_compressor.avail_in = _compressor.next_in.Length;
do
{
_compressor.next_out = new byte[10240];
_compressor.next_out_index = 0;
_compressor.avail_out = _compressor.next_out.Length;
_compressor.deflate(flush ? zlibConst.Z_SYNC_FLUSH : zlibConst.Z_NO_FLUSH);
// Silently ignore any errors here. We don't expect any real
// problems here, and we don't want to worry about the special case
// where we weren't sure if there was more in the compressor or
// not.
if (_compressor.next_out_index > 0)
{
if (_compressor.next_out_index < _compressor.next_out.Length)
{
Array.Resize(ref _compressor.next_out,
_compressor.next_out_index);
}
// As soon as the compressor spits out data, send it to the
// network connection. Let the network connection handle any
// further buffering. We do this mostly for simplicity. But
// there was also a strange case in the Delphi code where the
// client was busy and was constantly writing to a buffer for a
// long time, and eventually the server would time out.
SendBytes(_compressor.next_out);
}
}
while ((_compressor.avail_in > 0) || (_compressor.avail_out == 0));
}
private void CompressAndSendBytes(byte[] data)
{
if (data.Length == 0) return;
if (_toCompress != null)
CompressAndSendBytes(_toCompress, false);
_toCompress = data;
}
private void CompressAndSendInteger(int i)
{
byte[] data = new byte[4];
// Intel byte order.
data[0] = (byte)i;
data[1] = (byte)(i >> 8);
data[2] = (byte)(i >> 16);
data[3] = (byte)(i >> 24);
CompressAndSendBytes(data);
}
private void CompressAndSendWithSize(byte[] data)
{
CompressAndSendInteger(data.Length);
CompressAndSendBytes(data);
}
private void CompressAndSendWithSize(String data)
{
CompressAndSendWithSize(Encoding.ASCII.GetBytes(data));
}
private void FlushCompressionBuffer()
{
if (_toCompress != null)
{
CompressAndSendBytes(_toCompress, true);
_toCompress = null;
}
}
// Attempt to defer the flush until after multiple messages have been added. That will
// make things more efficient. If we can group multiple messages before a flush, we
// will send fewer bytes and fewer packets.
//
// This will never work quite as well as the Delphi version. That relied on the fact
// that only the GUI thread could send messages. All messages sent from one event (in
// the GUI message pump) were serviced by a single flush. This version will actually
// be more efficient on a machine with one CPU than a machine with lots of CPUs.
//
// I looked at some alternative ways to do this. If the worker thread were responsible
// for marshalling the message, not just flushing the compression buffer, that would
// slow down the rate at which we flushed messages, and speed up the thread that
// generates messages. That would possibly give us more messages per flush. Also, if
// there are multiple connections, each one would do a lot of work in it's own thread.
// So if some connections demanded more work than others, this might spread the work
// out more fairly.
//
// A better answer might be to create a single thread which is responsible for all
// reading and writing. Anything that is currently protected by the _lock would only
// be accessible from that thread. Any attempt to send a message, disconnect, or
// receive bytes from the server would be put into a queue. We'd only flush when the
// queue was empty. That would allow us to accumulate a large number of messages.
private volatile bool _flushIsPending = false;
private void FlushCompressionBufferSoon()
{
lock (_lock)
{
if (!_flushIsPending)
{
_flushIsPending = true;
ThreadPool.QueueUserWorkItem(new WaitCallback(FlushCompressionBufferCallback));
}
}
}
private void FlushCompressionBufferCallback(object state)
{
Thread.Sleep(0);
lock (_lock)
{
FlushCompressionBuffer();
_flushIsPending = false;
}
}
// Incoming from server.
private ZStream _decompressor = new ZStream();
private static int UnmarshalInt(byte[] data, int start)
{
int result = (int)(data[start]);
result += ((int)(data[start + 1])) << 8;
result += ((int)(data[start + 2])) << 16;
result += ((int)(data[start + 3])) << 24;
return result;
}
private static Int64 UnmarshalInt64(byte[] data, int start)
{
Int64 result = 0;
start += 7;
for (int i = 0; i < 8; i++)
{
result <<= 8;
result += data[start];
start--;
}
return result;
}
private void CheckForCompressedMessage()
{ // This will unmarshal as many complete messages as we can find
// and send them all to the client.
//
// Integers are sent as four bytes in the standard Intel byte order.
// Each message starts with a message id. This came from the message_id
// field that we sent to the server. This is sent as an integer. After
// that we receive the length of the message. That is also an integer.
// Length describes the number of bytes. Finally we receive the body of
// the message. This can be any string of bytes. Most often it is XML,
// but sometimes it contains binary data, including gif files. }
//
// This code is not reentrant, nor is is protected by locks. We assume
// that the code which sends us new data (the Connection class) will not
// send us more data until we finish the callback from the last data.
int consumed = 0;
while (true)
{
if (_decompressor.next_out_index - consumed < 12) break;
Int64 id = UnmarshalInt64(_decompressor.next_out, consumed);
int size = UnmarshalInt(_decompressor.next_out, consumed + 8);
int totalSize = size + 12;
if (_decompressor.next_out_index - consumed < totalSize) break;
byte[] body = new byte[size];
Array.Copy(_decompressor.next_out, consumed + 12, body, 0, size);
SendGoodResponse(id, body);
consumed += totalSize;
}
if (consumed > 0)
{
Array.Copy(_decompressor.next_out, consumed, _decompressor.next_out, 0, _decompressor.next_out_index - consumed);
_decompressor.next_out_index -= consumed;
_decompressor.avail_out += consumed;
}
}
// Status to client.
private Status _status;
///
/// Warning: This can change at any time.
///
private IListener _miscCallbacks;
private class OutstandingRequest
{
public Response callback;
public bool streaming;
public object clientId;
public readonly DateTime timestamp = DateTime.Now;
public Dictionary originalRequest;
}
// _outstandRequests must be protected by the lock.
private Dictionary _outstandingRequests = new Dictionary();
private void SendErrorResponse(object clientId, Response callback, bool streaming)
{ // We don't display the error for streaming messages. This is a little
// ugly. We're trying to solve a specific problem. The server can
// disconnect us in a timeout. This happens when we are already stopped,
// and not really expecting data. In this case we don't want to see a
// yellow tickmark because that implies that there was a problem and we
// are going to retry. Yuck!
IListener miscCallbacks = _miscCallbacks;
if ((miscCallbacks != null) && (!streaming))
try
{
miscCallbacks.OnMessagePreview(PreviewArgs.Empty);
}
catch { }
if (callback != null)
try
{
callback(null, clientId);
}
catch (Exception e)
{
string debugView = e.Message + e.StackTrace;
}
}
private readonly char[] NEWLINE_ONLY = new char[] { '\n' };
private void OneServerDisconnected(byte[] body)
{
string disconnectMessage = BytesToStr(body);
//System.Diagnostics.Debug.Write(DateTime.Now.ToLongTimeString() + ": OneServerDisconnected(" + disconnectMessage + ")");
string[] pieces = disconnectMessage.Split(NEWLINE_ONLY);
if (pieces.Length < 2)
{
System.Diagnostics.Debug.Write("Invalid server disconnected message. Not enough entries. " + disconnectMessage);
return;
}
Int64 firstId;
if (!Int64.TryParse(pieces[0], out firstId))
{
System.Diagnostics.Debug.Write("Invalid server disconnected message. Can't parse first id. " + disconnectMessage);
return;
}
Int64 lastId;
if (!Int64.TryParse(pieces[1], out lastId))
{
System.Diagnostics.Debug.Write("Invalid server disconnected message. Can't parse last id. " + disconnectMessage);
return;
}
HashSet commandNames = new HashSet();
for (int i = 2; i < pieces.Length; i++)
{
string next = pieces[i];
if (next != "")
commandNames.Add(next);
}
SortedDictionary toReport =
new SortedDictionary();
lock (_lock)
{
if (null == _outstandingRequests)
return;
foreach (var kvp in _outstandingRequests)
{
if ((kvp.Key >= firstId) && (kvp.Key <= lastId))
{
byte[] command;
if (kvp.Value.originalRequest.TryGetValue("command", out command))
if (commandNames.Contains(BytesToStr(command)))
toReport[kvp.Key] = kvp.Value;
}
}
foreach (var kvp in toReport)
_outstandingRequests.Remove(kvp.Key);
}
StringBuilder debugMessage = new StringBuilder();
debugMessage.Append(DateTime.Now.ToShortTimeString());
debugMessage.Append(" One server disconnected. First id = ");
debugMessage.Append(firstId);
debugMessage.Append(", last id = ");
debugMessage.Append(lastId);
debugMessage.Append(", found ");
debugMessage.Append(toReport.Count);
debugMessage.Append(" messages, commands = (");
bool first = true;
foreach (string command in commandNames)
{
if (first)
first = false;
else
debugMessage.Append(", ");
debugMessage.Append(command);
}
debugMessage.Append(").");
System.Diagnostics.Debug.WriteLine(debugMessage.ToString());
foreach (var kvp in toReport)
SendErrorResponse(kvp.Value.clientId, kvp.Value.callback, kvp.Value.streaming);
}
private void SendGoodResponse(Int64 msgId, byte[] body)
{
//File.AppendAllText("WebSocketGatewayResponses.txt", $"{DateTime.Now:HH:mm.ss.fff}{Environment.NewLine}{Encoding.UTF8.GetString(body)}{Environment.NewLine}{Environment.NewLine}");
Debug.WriteLine($"{DateTime.Now:HH:mm.ss.fff}{Environment.NewLine}{Encoding.UTF8.GetString(body)}{Environment.NewLine}{Environment.NewLine}");
//String bodyAsString = BytesToStr(body);
//if (bodyAsString.Length > 50)
// bodyAsString = bodyAsString.Substring(0, 49) + '…';
//System.Diagnostics.Debug.WriteLine(msgId + ": " + bodyAsString);
if (msgId == -1)
{
//RA: There are too many of these as indicated in Loggly, will turn off for now and review with Phil and Michael F
//if (_warningCounter < MAX_WARNINGS_TO_LOG)
//{
// TILogger.Warning(message: $"-1 Message Id Received with Body: {BytesToStr(body)}");
// _warningCounter++;
//}
OneServerDisconnected(body);
return;
}
OutstandingRequest request = null;
lock (_lock)
{
if (_outstandingRequests == null)
// This might be possible. We are trying to shut down, but we
// still got some data after that. That shouldn't happen, but
// might be an artifact of the multiple threads.
return;
_outstandingRequests.TryGetValue(msgId, out request);
if (request == null)
return;
if (!request.streaming)
_outstandingRequests.Remove(msgId);
}
// Do the callbacks without holding any locks. We were getting into
// trouble with deadlock when we tried to hold the lock during the
// callback. At one time (in the early Delphi days) we tried to hold
// the lock so we could do a better job canceling requests.
if (request.callback != null)
{
IListener miscCallbacks = _miscCallbacks;
if (miscCallbacks != null)
try
{
PreviewArgs args;
args.body = body;
args.originalTimestamp = request.timestamp;
args.originalRequest = request.originalRequest;
miscCallbacks.OnMessagePreview(args);
if (body == null)
{
if (_warningCounter < MAX_WARNINGS_TO_LOG)
{
TILogger.Warning(message: $"MessageId:{msgId}, Null Body");
_warningCounter++;
}
}
}
catch { }
//DateTime startTime = DateTime.Now;
//StringBuilder sb = new StringBuilder();
//sb.Append(startTime);
//sb.Append(": command=");
//sb.Append(BytesToStr(request.originalRequest["command"]));
try
{
// File.AppendAllText("responsesTcpIp.txt", $"{msgId}:{Encoding.UTF8.GetString(body)}{Environment.NewLine}{Environment.NewLine}");
request.callback(body, request.clientId);
}
catch (Exception e)
{
//sb.Append(", exception=");
//sb.Append(e);
string debugView = e.StackTrace;
}
//sb.Append(", time=");
//sb.Append((DateTime.Now - startTime).Ticks / 10000.0);
//sb.Append("ms");
//System.Diagnostics.Debug.WriteLine(sb);
}
}
void ConnectionCallbacks.OnAutoRetry(IConnection source, String msg)
{
IListener miscCallbacks = _miscCallbacks;
if (miscCallbacks != null)
try
{
miscCallbacks.OnAutoRetry(msg);
}
catch { }
}
// public
///
/// Communicate with server
///
///
///
public TalkWithServer(IConnectionFactory connectionFactory, IListener miscCallbacks)
{
//_debugOut.Add(new byte[]{});
_sessionId++;
_lock = _decompressor;
_compressor.deflateInit(5);
_decompressor.inflateInit();
_miscCallbacks = miscCallbacks;
_connection = connectionFactory.CreateConnection(this);
TIUserSession.ConnectionType = _connection.GetType().Name;
}
///
/// This was separated from the constructor so you'd have time to store this object in a variable.
///
public void Connect()
{
_connection.Connect();
SendBytes(Encoding.ASCII.GetBytes("command=set_output&mode=zlib64\r\ncommand=set_input&mode=zlib\r\n"));
}
private static bool ValidMessage(Dictionary message)
{ // We skip/ignore certain types of fields. There must be at least
// one field that we don't skip or ignore.
// We also skip null values to avoid ArgumentNullException.
foreach (KeyValuePair pair in message)
if ((!String.IsNullOrEmpty(pair.Key)) && (pair.Key != "message_id"))
return true;
return false;
}
///
/// Get status of connection
///
///
public Status GetStatus()
{
return _status;
}
///
/// You must hold the _lock to access this.
///
/// At one time we were using Interlocked.Increment(). That version of
/// the code had multiple issues.
///
/// 0 is always reserved. If we don't except a response from a message,
/// we give it an id of 0. If we receive a response with message id
/// 0, we always throw it into the bit bucket.
///
private static Int64 _lastMessageId = 0;
///
/// Call this if you are no longer interested in callbacks for a message.
///
/// Note: This does not guarantee that responses will stop immediately.
/// A callback might still occur. This is a multithreaded library, and
/// the callbacks are not called from a lock.
///
/// Note: This does not notify the server. Many server requests have
/// corresponding requests to cancel them.
///
/// The purpose of this method is to remove unused memory from the
/// TalkWithServer object. Each time you send a request with a callback
/// that allocates some memory to handle the callback.
///
/// Use CancelToken if you need to cancel a message. CancelToken will
/// call this function. CancelToken will also take care of other
/// details, like pointing to the right TalkWithServer object, and
/// managing some multi-threaded issues.
///
///
private void CancelMessage(Int64 messageId)
{
lock (_lock)
{
if (null != _outstandingRequests)
_outstandingRequests.Remove(messageId);
}
}
// On error, this will immediately call the callback. Same thread, even
// before returning from this function. Errors can also come back in
// another thread, for example if we are disconnected before receiving
// the response.
///
/// Send a message and optionally expect a stream of responses. If
/// streaming is true, we will continue to listen for responses until
/// the connection is broken. If response is null, no response is
/// expected; if the server sends one, it is ignored.
///
/// Send this to the server.
/// Callback when we get a response. Can be null if you don't want a response.
///
/// True if we're expecting more than one response. Otherwise the callback
/// will be removed as soon as we get the first response. This is not used if
/// response is null.
///
///
/// This is returned as part of the callback. It can be anything.
/// This is not used if response is null.
///
///
/// This should be null if you have no intention of ever canceling the message.
/// Otherwise the CancelToken will be associated with the message. Use the
/// CancelToken to cancel the message, when you don't want any more responses.
/// Note that you can cancel a message before, during or after this call to
/// TalkWithServer.SendMessage(). Sending and canceling are both thread safe.
/// This is important because the code that initiates and cancels messages may
/// not talk directly with TalkWithServer. More likely that code will send a
/// request to SendManager which will put the request in a queue and call
/// TalkWithServer.SendMessage() in a different thread.
/// This is not used if response is null.
///
public void SendMessage(Dictionary message,
Response response = null, bool streaming = false,
object clientId = null, CancelToken cancelToken = null)
{
lock (_lock)
{
if (_outstandingRequests != null && _status != Status.scsDisconnected && ValidMessage(message))
{ // Send message.
// assert(_outstandingRequests != null) -- Invariant: (_status == disconnected) <==> (_outstandingRequests == null)
Int64 messageId = 0;
if (response != null)
{
OutstandingRequest newRequest = new OutstandingRequest();
newRequest.callback = response;
newRequest.streaming = streaming;
newRequest.clientId = clientId;
newRequest.originalRequest = message;
messageId = ++_lastMessageId;
_outstandingRequests.Add(messageId, newRequest);
if (null != cancelToken)
{
Thread.MemoryBarrier();
// Remember that someone can cancel a message before or during the call
// to this function. The following call to Attach() might immediately
// try to cancel the message. So we can't call Attach() until after we
// are done with _outstandingRequests.
cancelToken.Attach(this, messageId);
if (cancelToken.IsCanceled)
// Perhaps a silly optimization. It's entirely possible that the
// message will be canceled in another thread one op code after
// this test.
return;
}
}
//var mb = new StringBuilder();
//mb.Append($"message_id={messageId}");
//foreach (var item in message)
//{
// mb.Append($"&{item.Key}={Encoding.UTF8.GetString(item.Value)}");
//}
//mb.Append("\r\n");
//File.AppendAllText("commands.txt", mb.ToString());
//SendBytes(Encoding.UTF8.GetBytes(mb.ToString()));
//return;
if (messageId != 0)
{
CompressAndSendWithSize("message_id");
CompressAndSendWithSize(Convert.ToString(messageId));
}
foreach (KeyValuePair pair in message)
if ((!String.IsNullOrEmpty(pair.Key)) && (pair.Key != "message_id"))
{
CompressAndSendWithSize(pair.Key);
CompressAndSendWithSize(pair.Value);
}
CompressAndSendInteger(0);
FlushCompressionBufferSoon();
// Done! Do not send error report!
return;
}
}
// Else, unable to send message, so send an error immediately.
SendErrorResponse(clientId, response, streaming);
}
///
/// Give up the resources. After calling Disconnect() you should not
/// use this object any more.
/// Turn off the global callbacks. That's consistent with the normal
/// usage in the Delphi version. We don't care about the going down
/// messages because we're the one who told it to go down. But send
/// all of the per-message callbacks. That's part of the contract.
/// And the individual listeners do not have any other way to know that
/// we went down.
///
public void Disconnect()
{
if (_status == Status.scsConnected)
{
//RA: We don't need to log this because somewhere in TI Pro
//its requesting to Disconnect(), most likely from ConnectionLifeCycle::SoftReset()
//if (_warningCounter < MAX_WARNINGS_TO_LOG)
//{
// TILogger.Warning(message: "Disconnect called from {}");
// _warningCounter++;
//}
}
_miscCallbacks = null;
ConnectionDown();
_connection.Disconnect();
}
///
/// Disconnect
///
~TalkWithServer()
{
Disconnect();
}
private static Encoding _utf8WithException =
Encoding.GetEncoding("utf-8", new EncoderExceptionFallback(), new DecoderExceptionFallback());
private static Encoding _latin1 =
Encoding.GetEncoding("Latin1", new EncoderReplacementFallback(), new DecoderReplacementFallback());
///
///The server / protocol is a little bit sloppy when it comes to
/// encodings. We assume that everything is UTF-8. If that
/// fails, then we try latin-1. That's not true for everything,
/// but it should be right most of the time, and it won't fail
/// to return something.
///
///
/// UTF-8 or latin-1
public static String BytesToStr(byte[] bytes)
{
try
{
return _utf8WithException.GetString(bytes);
}
catch { }
return _latin1.GetString(bytes);
}
public void OnConnected(IConnection connection)
{
_status = Status.scsConnected;
}
public void OnError(Exception ex)
{
if (_errorCounter < MAX_ERRORS_TO_LOG)
{
TILogger.Error(ex.Message, ex);
_errorCounter++;
}
}
}
}