using System;
using System.Collections.Generic;
using TradeIdeas.MiscSupport;
using TradeIdeas.ServerConnection;
using TradeIdeas.TIProData.Interfaces;
namespace TradeIdeas.TIProData
{
///
/// This handles the data that was available to the Delphi TIQ client. We still use that format
/// for a lot of things. In particular, sending alerts. The old TIQ server will not be rewritten
/// any time soon. And the new TIQ (tikiller) server will send a lot of similar data.
///
/// These messages are called alerts, however we actually send a lot of data this way. Some things
/// are more like normal top lists. The format is very flexible and you can send almost anything.
///
/// Part of this was abstracted from TMainConnection in MainConnectionUnit.pas. In particular, the
/// names of the invidiual commands for the server came from there. A lot of that Pascal code has
/// been moved to other parts of IConnectionMaster in the C# code.
///
/// The overall structure for this class was inspired more by StreamingResponseManager in the C#
/// code. In particular, all of the C# code is thread safe while the Delphi code assumes that
/// most of the work will happen in the GUI thread.
///
public class TraditionalAlertsManager
{
public delegate void OnResponse(String category, String message);
private Object _mutex = new object();
///
/// This is protected by _mutex.
///
private readonly Dictionary _subscriptions = new Dictionary();
private readonly ISendManager _sendManager;
internal TraditionalAlertsManager(ISendManager sendManager)
{
_sendManager = sendManager;
}
private object _connection;
///
/// Create a new connection to the alert server, if required.
/// This might do nothing because no connection is required.
/// This might do nothing because there is already a connection.
/// This might send a "listen" command to get results from that server.
///
///
/// If this is true definitely want a connection.
/// If this is false, the method will check if there are already pending requests.
///
private void CheckListenStatus(bool addInProgress)
{
lock (_mutex)
{
if (null != _connection) return;
if ((_subscriptions.Count == 0) && !addInProgress) return;
_connection = new object();
_sendManager.SendMessage(TalkWithServer.CreateMessage("command", "listen_for_alerts"), Response, true, _connection);
foreach (var nvp in _subscriptions)
_sendManager.SendMessage(
TalkWithServer.CreateMessage("command", "add_alert_category", "category", nvp.Key));
}
}
///
/// Request data.
/// This is thread safe. You can call it from any thread.
/// Note: You can request multiple callbacks for a given category. That works well.
/// Requesting the same callback/category pair more than once is not well defined.
/// Currently we used a single value of type OnResponse and use the + operator on it.
/// I believe that allows duplicates. That might chane to a HashSet of OnResponse
/// objects. That would ignore duplicates.
///
/// What to subscribe to.
///
/// Where to send the responses. This might be called from any thread.
/// This should not throw an exception. This object has some protection against that
/// but the callback function can do a better job.
///
public void Subscribe(string category, OnResponse callback)
{
lock (_mutex)
{
OnResponse callbackList;
CheckListenStatus(addInProgress: true);
if (_subscriptions.TryGetValue(category, out callbackList))
// The server is already sending us data. Add one more internal listener.
_subscriptions[category] = callbackList + callback;
else
{ // This is the first request for this category. Forward the request to the server.
_subscriptions[category] = callback;
if (null != _connection)
_sendManager.SendMessage(
TalkWithServer.CreateMessage("command", "add_alert_category", "category", category));
}
}
}
///
/// Remove a subscription.
/// This is thread safe and can be called from any thread.
/// As is typical with a thread safe callback, this does not gaurentee that the callbacks will
/// stop immediately. However, you should still call this to make sure that resources are
/// (eventually) disposed of.
/// If there is no such subscription, that's acceptable. No error is generated.
///
/// The same as in the call to .
/// The same as in the call to .
public void Unsubscribe(string category, OnResponse callback)
{
lock (_mutex)
{
OnResponse callbackList;
if (_subscriptions.TryGetValue(category, out callbackList))
{
callbackList -= callback;
if (null == callbackList)
{ // Tell the server to stop sending us this data.
_subscriptions.Remove(category);
if (null != _connection)
_sendManager.SendMessage(
TalkWithServer.CreateMessage("command", "delete_alert_category", "category", category));
}
else
// There are other internal listeners.
_subscriptions[category] = callbackList;
}
}
}
private static readonly string[] FIELD_SEPERATOR = new string[] { "\r\n" };
private void Response(byte[] body, object serverConnection)
{
if (_connection != serverConnection)
// Old message.
return;
if (null == body)
{ // The connection was broken.
lock (_mutex)
{
if (_connection != serverConnection)
// Old message.
return;
_connection = null;
CheckListenStatus(addInProgress: false);
}
}
else
{
string wholeMessage = TalkWithServer.BytesToStr(body);
string[] pieces = wholeMessage.Split(FIELD_SEPERATOR, 2, StringSplitOptions.None);
string category = pieces[0];
string message = "";
if (pieces.Length > 1)
message = pieces[1];
SendMessage(category, message);
}
}
///
/// Send a message to anyone who has subscribed to that message.
/// This is public for the sake of sending debug data to the client.
/// Normally this is called by this class when data comes from the server.
/// This can be called from any thread.
///
///
///
public void SendMessage(string category, string message)
{
// It's tempting to make our own HashSet of OnResponse items. That would give us
// more control. We could, for example, wrap each callback in it's own exception
// handler. Now, if own listener fails, we skip the rest. Of course, that means
// we'd need to copy the hash set each time. OnResponse is currently a read-
// only object, so no copies are required.
OnResponse callbackList;
lock (_mutex)
{
_subscriptions.TryGetValue(category, out callbackList);
}
if (null != callbackList)
try
{
callbackList(category, message);
}
catch { }
}
private static readonly char[] EQUALS = new char[] { '=' };
///
/// The messages can have any formation. But it is very common to encode an alert like a url.
/// This function works like http://php.net/manual/en/function.parse-str.php
///
///
/// An encoded string from the server.
/// Something like "symbol=DELL&price=23.45&description=New+high+price".
///
/// The decoded version of url string.
public static RowData ParseOneRow(string raw)
{
//Dictionary result = new Dictionary();
RowData result = new RowData();
string [] pairs = raw.Split('&');
foreach (string pair in pairs)
{
string [] pieces = pair.Split(EQUALS, 2);
if (pieces.Length == 2)
result.Data[Uri.UnescapeDataString(pieces[0])] = Uri.UnescapeDataString(pieces[1]);
}
return result;
}
}
}