using System;
using System.Collections.Generic;
using System.Text;
using TradeIdeas.ServerConnection;
using System.Threading;
/* This is the preferred way for most of the code to send messages to the server.
* Some of the core utilities will access ConnectionLifeCycle directly, but most
* of the program should go here, instead.
*
* Among other things, this class automatically adds a delay when necesary. In
* most cases, if you get an error from the server (if the body of the message
* is null) the easist thing to do is to immediately call this and ask to send
* the message again. Otherwise, most classes would need their own timer
* thread. That's how the Delphi code worked. Each class had its own timer
* and its own rules for exactly when it would retry. */
namespace TradeIdeas.TIQData
{
public class SendManager
{
private Object _mutex = new object();
///
/// When is it reasonable to request a new connection.
///
private DateTime _restartTime = new DateTime();
private readonly ConnectionLifeCycle _connectionLifeCycle;
///
/// This only changes in the ConnectionLifeCycle thread.
/// This is initially null, but it will never change from not null back to null.
///
private TalkWithServer _connection;
internal SendManager(ConnectionLifeCycle connectionLifeCycle)
{
_connectionLifeCycle = connectionLifeCycle;
connectionLifeCycle.OnConnection += new ConnectionLifeCycle.ConnectionCallback(_connectionLifeCycle_OnConnection);
connectionLifeCycle.OnTimer += new ConnectionLifeCycle.Callback(_connectionLifeCycle_OnTimer);
}
///
/// This is always called in the ConnectionLifeCycle thread.
///
///
private void _connectionLifeCycle_OnTimer(ConnectionLifeCycle sender)
{
SendFromQueue(_connection);
}
private void SendFromQueue(TalkWithServer connection)
{
if (null == connection)
return;
if (connection.GetStatus() == TalkWithServer.Status.scsDisconnected)
return;
LinkedList toSend = null;
lock (_mutex)
{
if (_allRequests.Count > 0)
{
toSend = _allRequests;
_allRequests = new LinkedList();
_requestsWithCancel.Clear();
}
}
if (null == toSend)
return;
foreach (Request request in toSend)
{
connection.SendMessage(request.Message, request.Response, request.Streaming, request.ClientId, request.CancelToken);
}
}
///
/// This is always called in the ConnectionLifeCycle thread.
///
///
///
private void _connectionLifeCycle_OnConnection(ConnectionLifeCycle sender, ServerConnection.TalkWithServer serverConnection)
{ // Experiments suggest that the exact timeout doesn't matter much. When I disconnect the network
// I see yellow tickmarks about twice a second, even if this number is much larger or smaller than
// 500ms. That 500 milliseconds no doubt comes from the pause in ConnectionLifeCycle.cs. Maybe we
// don't actually need a timer here. A previous revision of this code just used a flag to say that
// we'd seen at least one timer event since the last connect event. Effectively we're doing the
// same thing here.
_restartTime = new DateTime().AddMilliseconds(750);
_connection = serverConnection;
SendFromQueue(serverConnection);
}
private class Request
{
public readonly Dictionary Message;
public readonly TalkWithServer.Response Response;
public readonly bool Streaming;
public readonly object ClientId;
public readonly TalkWithServer.CancelToken CancelToken;
public Request(Dictionary message, TalkWithServer.Response response,
bool streaming, object clientId, TalkWithServer.CancelToken cancelToken)
{
Message = message;
Response = response;
Streaming = streaming;
ClientId = clientId;
CancelToken = cancelToken;
}
}
private LinkedList _allRequests = new LinkedList();
private Dictionary> _requestsWithCancel =
new Dictionary>();
///
/// This is a wrapper around TalkWithServer.SendMessage().
/// On error, this will queue up requests before trying to reconnect.
/// If the caller gets an error from the server, it's safe to call this
/// immediately to retry.
///
/// Send a message and optionally expect a stream of responses. If
/// streaming is true, we will continue to listen for responses until
/// the connection is broken. If response is null, no response is
/// expected; if the server sends one, it is ignored.
///
/// Errors can come back in any thread. Errors can come back in this
/// thread, before this function finishes. (Ths current implemention
/// no longer sends an error in the current thread, but that could
/// change.)
///
/// Send this to the server.
/// Callback when we get a response. Can be null if you don't want a response.
///
/// True if we're expecting more than one response. Otherwise the callback
/// will be removed as soon as we get the first response. This is not used if
/// response is null.
///
///
/// This is returned as part of the callback. It can be anything.
/// This is not used if response is null.
///
///
/// This should be null if you have no intention of ever canceling the message.
/// Otherwise the CancelToken will be associated with the message. Use the
/// CancelToken to cancel the message, when you don't want any more responses.
/// Do not reuse a CancelToken. If you use SendManager.SendMessage() (rather
/// than going directly to TalkWithServer.SendMessage()) then you should use
/// SendManager.Cancel() to cancel the message.
/// This is not used if response is null.
///
public void SendMessage(Dictionary message,
TalkWithServer.Response response = null,
bool streaming = false,
object clientId = null,
TalkWithServer.CancelToken cancelToken = null)
{
bool needToWake;
lock (_mutex)
{
needToWake = _allRequests.Count == 0;
Request request = new Request(message, response, streaming, clientId, cancelToken);
LinkedListNode node = new LinkedListNode(request);
_allRequests.AddLast(node);
if (null != cancelToken)
// Add will throw an exception if the cancelToken is already in the table.
// That's good because you should not reuse those. That's not guaranteed
// to catch everything but it helps.
_requestsWithCancel.Add(cancelToken, node);
}
if (needToWake)
// First item in the queue. If the queue wasn't already empty, then we already sent
// the wakeup request.
if (((null != _connection) && (_connection.GetStatus() != TalkWithServer.Status.scsDisconnected))
|| (new DateTime() >= _restartTime))
// Either the conneciton is already good, or it's time to retry.
_connectionLifeCycle.WakeUpSoon();
}
///
/// Call this if you are no longer interested in a responses to a message.
///
/// The primary purpose of this is to clean up any memory associated with the
/// callbacks for this message.
///
/// Calling this does not guarantee that the callbacks will immediately cease. This
/// library is multi-threaded. The callbacks are not called from within a lock.
///
/// If you call this before the message is actually sent to the server, the message
/// might or might not be sent to the server.
///
///
/// A value that you previously sent to SendMessage().
///
/// Null is allowed. In that case we do nothing. That's a convenience. Otherwise
/// most callers would have to check for that themselves.
///
public void CancelMessage(TalkWithServer.CancelToken cancelToken)
{
if (null == cancelToken)
// No message to cancel.
return;
lock (_mutex)
{
LinkedListNode node;
if (_requestsWithCancel.TryGetValue(cancelToken, out node))
{ // Remove from our list.
_requestsWithCancel.Remove(cancelToken);
_allRequests.Remove(node);
}
}
// It's tempting to do the next line only if the previous if was not true.
// But it doesn't hurt to call TalkWithServer.CancelToken.Cancel(). That's
// fast. And it marks the cancelToken.IsCanceled property as true.
cancelToken.Cancel();
}
}
}