#include #include "../shared/ServerConnection64.h" #include "../shared/GlobalConfigFile.h" #include "../shared/LogFile.h" #include "../shared/ThreadMonitor.h" #include "../shared/XmlReader.h" #include "TopListRequest.h" class TopListConnection : private ServerConnection64, ThreadMonitor::Extra { private: static int64_t _lastClientId; static int64_t getNextClientId() { return __sync_add_and_fetch(&_lastClientId, 1); } struct RequestInfoBase { int64_t clientId; RequestInfoBase() : clientId(0) { } static std::string name(int64_t clientId) { return "tlr" + ntoa(clientId); } std::string name() const { return name(clientId); } }; struct RequestInfo : public RequestInfoBase { MessageId messageId; TopListRequest *request; RequestInfo() : messageId(0), request(NULL) { } ~RequestInfo() { delete request; } bool empty() const { return !(clientId || messageId || request); } }; std::unordered_map< int64_t, RequestInfo > _byClientId; struct HistoryRequestInfo : public RequestInfoBase { TopListHistoryRequest *request; HistoryRequestInfo() : request(NULL) { } ~HistoryRequestInfo() { delete request; } bool empty() const { return !(clientId || request); } }; std::unordered_map< std::string, HistoryRequestInfo > _historyByName; bool _initialized; std::string _remoteUserName; std::string _remotePassword; // From TalkWithServer64::IMessageListener via ServerConnection64 virtual void onAbort(int64_t clientId, MessageId messageId) { ThreadMonitor &tm = ThreadMonitor::find(); const auto it = _byClientId.find(clientId); if (it == _byClientId.end()) { tm.increment("onAbort() not found"); return; } tm.increment("onAbort() auto retry"); send(it->second, false); } static const int64_t LOGIN_RESPONSE; static const int64_t HISTORY_SHARED; // From TalkWithServer64::IMessageListener via ServerConnection64 virtual void onMessage(std::string bytes, int64_t clientId, MessageId messageId) { if (clientId < 0) ServerConnection64::onMessage(bytes, clientId, messageId); else if (clientId == LOGIN_RESPONSE) onLoginResponse(bytes); else if (clientId == HISTORY_SHARED) onHistorySharedResponse(bytes); else { ThreadMonitor &tm = ThreadMonitor::find(); const auto it = _byClientId.find(clientId); if (it == _byClientId.end()) tm.increment("onMessage() not found"); else { tm.increment("onMessage()"); onTopListResponse(bytes, it->first); } } } // From ServerConnection64 virtual void onNewConnection(TalkWithServer64 *connection) { Message message; message["command"] = "login"; message["username"] = _remoteUserName; message["password"] = _remotePassword; connection->sendMessage(message, this, LOGIN_RESPONSE, /*streaming*/ true); doWorkInThread([=]() { // It seems easier to do this outside of the onNewConnection() // callback. I'm not 100% sure that was necessary. But doing this // later means we have a lot fewer assumptions. sendHistoryListener(); }); } // ServerConnection64 virtual bool shouldTryToConnect() { return _initialized && ((!_byClientId.empty()) || (!_historyByName.empty())) && (!_remoteUserName.empty()) && (!_remotePassword.empty()); } // From IContainerThread virtual void initializeInThread() { ThreadMonitor::find().add(this); } // From ThreadMonitor::Extra virtual std::string getInfoForThreadMonitor() { TclList result; result<<"TopListConnection" < extractData(RapidXmlElement element) { SmarterP< TopListRequest::Data > data(NULL); data->start = strtolDefault(element.attribute("START_TIME"), 0); data->end = strtolDefault(element.attribute("END_TIME"), 0); for (RapidXmlElement rowElement : element) data->rows.emplace_back(rowElement.allAttributes()); return data; } static SmarterCP< TopListRequest::MetaData > extractMetaData(RapidXmlElement element) { SmarterP< TopListRequest::MetaData > metaData(NULL); metaData->collaborate = element.attribute("SHORT_FORM"); metaData->windowName = element.attribute("WINDOW_NAME"); for (RapidXmlElement columnElement : element["COLUMNS"]) metaData->columns.emplace_back(columnElement.allAttributes()); metaData->sortBy = element["SORT_BY"].attribute("FIELD"); return metaData; } void onHistorySharedResponse(std::string const &bytes) { ThreadMonitor::SetState tm("onHistorySharedResponse()"); //TclList msg; //msg<second; const std::string type = element.attribute("TYPE"); if (type == "data") { tm.increment("onHistorySharedResponse() data"); if (auto const &callback = requestInfo.request->onData) callback(extractData(element), requestInfo.clientId); _historyByName.erase(it); } else if (type == "info") { if (auto const &callback = requestInfo.request->onMetaData) { tm.increment("onHistorySharedResponse() info"); callback(extractMetaData(element), requestInfo.clientId); } else tm.increment("onHistorySharedResponse() info skipped"); } else tm.increment("onHistorySharedResponse() TYPE invalid"); } // This might delete the request from _byClientId. void onTopListResponse(std::string const &bytes, int64_t clientId) { ThreadMonitor::SetState tm("onTopListResponse()"); //LogFile::primary().sendString(TclList()<second; TopListRequest &request = *requestInfo.request; if (type == "data") { tm.increment("onTopListResponse() data"); if (requestInfo.name() != element.attribute("WINDOW")) { // This should never happen. This is almost an asertion failure. tm.increment("onTopListResponse() INVALID WINDOW"); //LogFile::primary().sendString(TclList()<second.request->onMetaData) { tm.increment("onTopListResponse() info found"); callback(extractMetaData(element), clientId); request.skipMetaData = TopListRequest::OptionalBool::True; } else tm.increment("onTopListResponse() info skipped"); } else { tm.increment("onTopListResponse() TYPE invalid"); } //TclList msg; //msg< // // Any normal client should look for the DISCONNECT_FOR_GOOD flag. // We're ignoring this semi-on-purpose. If you accidentally use this // same account on TI Pro, this server should try again and kick you // off. // // // I wish there was a better way to handle this. And the log file gets // spammed with these. :( // RapidXmlDocument document; document.loadFromString(bytes); RapidXmlElement element = document.root()["ACCOUNT_STATUS"]; const std::string state = element.attribute("STATE"); if (state == "good") tm.increment("onLoginResponse(good)"); else { TclList msg; msg< const &map) { std::string result; for (auto const &kvp : map) { if (!result.empty()) result += '&'; result += urlEncode(kvp.first); result += '='; result += urlEncode(kvp.second); } return result; } void send(RequestInfo &requestInfo, bool firstTime) { assert((requestInfo.messageId == 0) == firstTime); TopListRequest &request = *requestInfo.request; Message message; message["command"] = "ms_top_list_start"; message["name"] = requestInfo.name(); addOrNull(message, "collaborate", request.collaborate); addOrNull(message, "skip_metadata", request.skipMetaData); addOrNull(message, "use_database", request.useDatabase); message["last_update"] = ntoa(request.lastUpdate); message["streaming"] = request.streaming?'1':'0'; addOrNull(message, "save_to_mru", request.saveToMru); addOrNull(message, "collaborate_columns", request.collaborateColumns); addOrNull(message, "outside_market_hours", request.outsideMarketHours); message["result_count"] = ntoa(request.resultCount); addOrNull(message, "single_symbol", request.singleSymbol); addOrNull(message, "sort_formula", request.sortFormula); addOrNull(message, "where_formula", request.whereFormula); addOrNull(message, "extra_column_formulas", quote(request.extraColumnFormulas)); requestInfo.messageId = sendMessage(message, this, requestInfo.clientId, request.streaming); //TclList msg; //msg<second; Message message; message["command"] = "ms_top_list_stop"; message["name"] = info.name(); sendMessage(message); cancelMessage(info.messageId); _byClientId.erase(it); } }); } void logIn(std::string const &userName, std::string const &password) { invokeIfRequired([=]() { _remoteUserName = userName; _remotePassword = password; reset(); }); } static inline TopListConnection &instance() { static TopListConnection *result = new TopListConnection(); return *result; } }; // Client id -1 is used for pings. // In fact, all negative numbers are reserved for ServerConnection64. // Client id 0 is reserved for empty variables. // It's a good default for strtolDefault(). // The login request will come back with client id 1. // The data (but not the metadata) for all history requests comes back with // client id 2. This made some sense at one time, but the newer APIs all // try to avoid sharing any message ids. // All future requests will get the next number starting at 3. // Each top list request gets its own client id. // For history requests, only the meta data is associated with this id. const int64_t TopListConnection::LOGIN_RESPONSE = 1; const int64_t TopListConnection::HISTORY_SHARED = 2; int64_t TopListConnection::_lastClientId = HISTORY_SHARED; int64_t TopListRequest::send() { return TopListConnection::instance().send(this); } void TopListRequest::cancel(int64_t id) { TopListConnection::instance().cancel(id); } void TopListRequest::logIn(std::string const &userName, std::string const &password) { TopListConnection::instance().logIn(userName, password); } int64_t TopListHistoryRequest::send() { return TopListConnection::instance().send(this); }