using System; using System.Collections.Generic; using System.Text; using TradeIdeas.ServerConnection; using System.Threading; /* This is the preferred way for most of the code to send messages to the server. * Some of the core utilities will access ConnectionLifeCycle directly, but most * of the program should go here, instead. * * Among other things, this class automatically adds a delay when necesary. In * most cases, if you get an error from the server (if the body of the message * is null) the easist thing to do is to immediately call this and ask to send * the message again. Otherwise, most classes would need their own timer * thread. That's how the Delphi code worked. Each class had its own timer * and its own rules for exactly when it would retry. */ namespace TradeIdeas.TIQData { public class SendManager { private Object _mutex = new object(); /// /// When is it reasonable to request a new connection. /// private DateTime _restartTime = new DateTime(); private readonly ConnectionLifeCycle _connectionLifeCycle; /// /// This only changes in the ConnectionLifeCycle thread. /// This is initially null, but it will never change from not null back to null. /// private TalkWithServer _connection; internal SendManager(ConnectionLifeCycle connectionLifeCycle) { _connectionLifeCycle = connectionLifeCycle; connectionLifeCycle.OnConnection += new ConnectionLifeCycle.ConnectionCallback(_connectionLifeCycle_OnConnection); connectionLifeCycle.OnTimer += new ConnectionLifeCycle.Callback(_connectionLifeCycle_OnTimer); } /// /// This is always called in the ConnectionLifeCycle thread. /// /// private void _connectionLifeCycle_OnTimer(ConnectionLifeCycle sender) { SendFromQueue(_connection); } private void SendFromQueue(TalkWithServer connection) { if (null == connection) return; if (connection.GetStatus() == TalkWithServer.Status.scsDisconnected) return; LinkedList toSend = null; lock (_mutex) { if (_allRequests.Count > 0) { toSend = _allRequests; _allRequests = new LinkedList(); _requestsWithCancel.Clear(); } } if (null == toSend) return; foreach (Request request in toSend) { connection.SendMessage(request.Message, request.Response, request.Streaming, request.ClientId, request.CancelToken); } } /// /// This is always called in the ConnectionLifeCycle thread. /// /// /// private void _connectionLifeCycle_OnConnection(ConnectionLifeCycle sender, ServerConnection.TalkWithServer serverConnection) { // Experiments suggest that the exact timeout doesn't matter much. When I disconnect the network // I see yellow tickmarks about twice a second, even if this number is much larger or smaller than // 500ms. That 500 milliseconds no doubt comes from the pause in ConnectionLifeCycle.cs. Maybe we // don't actually need a timer here. A previous revision of this code just used a flag to say that // we'd seen at least one timer event since the last connect event. Effectively we're doing the // same thing here. _restartTime = new DateTime().AddMilliseconds(750); _connection = serverConnection; SendFromQueue(serverConnection); } private class Request { public readonly Dictionary Message; public readonly TalkWithServer.Response Response; public readonly bool Streaming; public readonly object ClientId; public readonly TalkWithServer.CancelToken CancelToken; public Request(Dictionary message, TalkWithServer.Response response, bool streaming, object clientId, TalkWithServer.CancelToken cancelToken) { Message = message; Response = response; Streaming = streaming; ClientId = clientId; CancelToken = cancelToken; } } private LinkedList _allRequests = new LinkedList(); private Dictionary> _requestsWithCancel = new Dictionary>(); /// /// This is a wrapper around TalkWithServer.SendMessage(). /// On error, this will queue up requests before trying to reconnect. /// If the caller gets an error from the server, it's safe to call this /// immediately to retry. /// /// Send a message and optionally expect a stream of responses. If /// streaming is true, we will continue to listen for responses until /// the connection is broken. If response is null, no response is /// expected; if the server sends one, it is ignored. /// /// Errors can come back in any thread. Errors can come back in this /// thread, before this function finishes. (Ths current implemention /// no longer sends an error in the current thread, but that could /// change.) /// /// Send this to the server. /// Callback when we get a response. Can be null if you don't want a response. /// /// True if we're expecting more than one response. Otherwise the callback /// will be removed as soon as we get the first response. This is not used if /// response is null. /// /// /// This is returned as part of the callback. It can be anything. /// This is not used if response is null. /// /// /// This should be null if you have no intention of ever canceling the message. /// Otherwise the CancelToken will be associated with the message. Use the /// CancelToken to cancel the message, when you don't want any more responses. /// Do not reuse a CancelToken. If you use SendManager.SendMessage() (rather /// than going directly to TalkWithServer.SendMessage()) then you should use /// SendManager.Cancel() to cancel the message. /// This is not used if response is null. /// public void SendMessage(Dictionary message, TalkWithServer.Response response = null, bool streaming = false, object clientId = null, TalkWithServer.CancelToken cancelToken = null) { bool needToWake; lock (_mutex) { needToWake = _allRequests.Count == 0; Request request = new Request(message, response, streaming, clientId, cancelToken); LinkedListNode node = new LinkedListNode(request); _allRequests.AddLast(node); if (null != cancelToken) // Add will throw an exception if the cancelToken is already in the table. // That's good because you should not reuse those. That's not guaranteed // to catch everything but it helps. _requestsWithCancel.Add(cancelToken, node); } if (needToWake) // First item in the queue. If the queue wasn't already empty, then we already sent // the wakeup request. if (((null != _connection) && (_connection.GetStatus() != TalkWithServer.Status.scsDisconnected)) || (new DateTime() >= _restartTime)) // Either the conneciton is already good, or it's time to retry. _connectionLifeCycle.WakeUpSoon(); } /// /// Call this if you are no longer interested in a responses to a message. /// /// The primary purpose of this is to clean up any memory associated with the /// callbacks for this message. /// /// Calling this does not guarantee that the callbacks will immediately cease. This /// library is multi-threaded. The callbacks are not called from within a lock. /// /// If you call this before the message is actually sent to the server, the message /// might or might not be sent to the server. /// /// /// A value that you previously sent to SendMessage(). /// /// Null is allowed. In that case we do nothing. That's a convenience. Otherwise /// most callers would have to check for that themselves. /// public void CancelMessage(TalkWithServer.CancelToken cancelToken) { if (null == cancelToken) // No message to cancel. return; lock (_mutex) { LinkedListNode node; if (_requestsWithCancel.TryGetValue(cancelToken, out node)) { // Remove from our list. _requestsWithCancel.Remove(cancelToken); _allRequests.Remove(node); } } // It's tempting to do the next line only if the previous if was not true. // But it doesn't hurt to call TalkWithServer.CancelToken.Cancel(). That's // fast. And it marks the cancelToken.IsCanceled property as true. cancelToken.Cancel(); } } }