using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using TradeIdeas.ServerConnection;
using TradeIdeas.MiscSupport;
namespace TradeIdeas.TIQData
{
///
/// This handles the data that was available to the Delphi TIQ client. We still use that format
/// for a lot of things. In particular, sending alerts. The old TIQ server will not be rewritten
/// any time soon. And the new TIQ (tikiller) server will send a lot of similar data.
///
/// These messages are called alerts, however we actually send a lot of data this way. Some things
/// are more like normal top lists. The format is very flexible and you can send almost anything.
///
/// Part of this was abstracted from TMainConnection in MainConnectionUnit.pas. In particular, the
/// names of the invidiual commands for the server came from there. A lot of that Pascal code has
/// been moved to other parts of ConnectionMaster in the C# code.
///
/// The overall structure for this class was inspired more by StreamingResponseManager in the C#
/// code. In particular, all of the C# code is thread safe while the Delphi code assumes that
/// most of the work will happen in the GUI thread.
///
public class TraditionalAlertsManager
{
public delegate void OnResponse(String category, String message);
private Object _mutex = new object();
///
/// This is protected by _mutex.
///
private readonly Dictionary _subscriptions = new Dictionary();
private ConnectionLifeCycle _connectionLifeCycle;
internal TraditionalAlertsManager(ConnectionLifeCycle connectionLifeCycle)
{
_connectionLifeCycle = connectionLifeCycle;
_connectionLifeCycle.OnConnection += OnConnect;
}
private TalkWithServer _initializedServerConnection;
private void OnConnect(ConnectionLifeCycle sender, TalkWithServer serverConnection)
{
if (sender.Status == ConnectionLifeCycle.ConnectionStatus.Full)
serverConnection.SendMessage(
TalkWithServer.CreateMessage("command", "listen_for_alerts"), Response, true);
lock (_mutex)
{
_initializedServerConnection = serverConnection;
foreach (var nvp in _subscriptions)
serverConnection.SendMessage(
TalkWithServer.CreateMessage("command", "add_alert_category", "category", nvp.Key));
}
}
///
/// Request data.
/// This is thread safe. You can call it from any thread.
/// Note: You can request multiple callbacks for a given category. That works well.
/// Requesting the same callback/category pair more than once is not well defined.
/// Currently we used a single value of type OnResponse and use the + operator on it.
/// I believe that allows duplicates. That might chane to a HashSet of OnResponse
/// objects. That would ignore duplicates.
///
/// What to subscribe to.
///
/// Where to send the responses. This might be called from any thread.
/// This should not throw an exception. This object has some protection against that
/// but the callback function can do a better job.
///
public void Subscribe(string category, OnResponse callback)
{
lock (_mutex)
{
OnResponse callbackList;
if (_subscriptions.TryGetValue(category, out callbackList))
// The server is already sending us data. Add one more internal listener.
_subscriptions[category] = callbackList + callback;
else
{ // This is the first request for this category. Forward the request to the server.
_subscriptions[category] = callback;
if (null != _initializedServerConnection)
_initializedServerConnection.SendMessage(
TalkWithServer.CreateMessage("command", "add_alert_category", "category", category));
}
}
}
///
/// Remove a subscription.
/// This is thread safe and can be called from any thread.
/// As is typical with a thread safe callback, this does not gaurentee that the callbacks will
/// stop immediately. However, you should still call this to make sure that resources are
/// (eventually) disposed of.
/// If there is no such subscription, that's acceptable. No error is generated.
///
/// The same as in the call to .
/// The same as in the call to .
public void Unsubscribe(string category, OnResponse callback)
{
lock (_mutex)
{
OnResponse callbackList;
if (_subscriptions.TryGetValue(category, out callbackList))
{
callbackList -= callback;
if (null == callbackList)
{ // Tell the server to stop sending us this data.
_subscriptions.Remove(category);
if (null != _initializedServerConnection)
_initializedServerConnection.SendMessage(
TalkWithServer.CreateMessage("command", "delete_alert_category", "category", category));
}
else
// There are other internal listeners.
_subscriptions[category] = callbackList;
}
}
}
private static readonly string[] FIELD_SEPERATOR = new string[] { "\r\n" };
private void Response(byte[] body, object serverConnection)
{
if (null == body)
{ // The connection was broken.
lock (_mutex)
{
if (_initializedServerConnection == serverConnection)
// Don't try to use this connection any more.
_initializedServerConnection = null;
}
}
else
{
string wholeMessage = TalkWithServer.BytesToStr(body);
string[] pieces = wholeMessage.Split(FIELD_SEPERATOR, 2, StringSplitOptions.None);
string category = pieces[0];
string message = "";
if (pieces.Length > 1)
message = pieces[1];
SendMessage(category, message);
}
}
///
/// Send a message to anyone who has subscribed to that message.
/// This is public for the sake of sending debug data to the client.
/// Normally this is called by this class when data comes from the server.
/// This can be called from any thread.
///
///
///
public void SendMessage(string category, string message)
{
// It's tempting to make our own HashSet of OnResponse items. That would give us
// more control. We could, for example, wrap each callback in it's own exception
// handler. Now, if own listener fails, we skip the rest. Of course, that means
// we'd need to copy the hash set each time. OnResponse is currently a read-
// only object, so no copies are required.
OnResponse callbackList;
lock (_mutex)
{
_subscriptions.TryGetValue(category, out callbackList);
}
if (null != callbackList)
try
{
callbackList(category, message);
}
catch { }
}
private static readonly char[] EQUALS = new char[] { '=' };
///
/// The messages can have any formation. But it is very common to encode an alert like a url.
/// This function works like http://php.net/manual/en/function.parse-str.php
///
///
/// An encoded string from the server.
/// Something like "symbol=DELL&price=23.45&description=New+high+price".
///
/// The decoded version of url string.
public static RowData ParseOneRow(string raw)
{
//Dictionary result = new Dictionary();
RowData result = new RowData();
string [] pairs = raw.Split('&');
foreach (string pair in pairs)
{
string [] pieces = pair.Split(EQUALS, 2);
if (pieces.Length == 2)
result.Data[Uri.UnescapeDataString(pieces[0])] = Uri.UnescapeDataString(pieces[1]);
}
return result;
}
}
}