#include "../fast_alert_search/history_server/Marshal.h" #include "MarketDataProxyClient.h" ///////////////////////////////////////////////////////////////////// // MarketDataProxyClient::ServerConnection::RequestInfo ///////////////////////////////////////////////////////////////////// void MarketDataProxyClient::ServerConnection::RequestInfo::onMessage (std::string bytes, int64_t clientId, MessageId messageId) { owner->onMessage(*this, bytes); } void MarketDataProxyClient::ServerConnection::RequestInfo::onAbort (int64_t clientId, MessageId messageId) { //std::cout<<"onAbort("<onAbort(*this); } ///////////////////////////////////////////////////////////////////// // MarketDataProxyClient::ServerConnection ///////////////////////////////////////////////////////////////////// // This is overkill. A C++ optimization just because we can. Instead of // creating a new object every time we reuse this object. We could always // change createSubscribeMessage() to return a new object every time, or we // could change this to a thread local variable, if it ever came up. static ServerConnection64::Message subscribeMessage = {{"command", "request_from_proxy"}, {"type", ""}, {"symbol", ""}}; static std::string &subscribeType = subscribeMessage["type"]; static std::string &subscribeSymbol = subscribeMessage["symbol"]; //"request_from_proxy", "cancel_from_proxy", "debug_dump_from_proxy" MarketDataProxyClient::Message const & MarketDataProxyClient::ServerConnection::createSubscribeMessage(DataTypes::Internal type, std::string const &symbol) { subscribeType = DataTypes::toString(type, true); subscribeSymbol = symbol; return subscribeMessage; } MarketDataProxyClient::ServerConnection::ServerConnection (std::string const &name, MarketDataProxyClient *owner) : ServerConnection64(name, owner->getContainer()), _owner(owner) { getAndParseAddress("market_data_proxy_address"); start(); } static ServerConnection64::Message unsubscribeMessage = {{"command", "cancel_from_proxy"}, {"type", ""}, {"symbol", ""}}; static std::string &unsubscribeType = unsubscribeMessage["type"]; static std::string &unsubscribeSymbol = unsubscribeMessage["symbol"]; MarketDataProxyClient::Message const & MarketDataProxyClient::ServerConnection::createUnSubscribeMessage(DataTypes::Internal type, std::string const &symbol) { unsubscribeType = DataTypes::toString(type, true); unsubscribeSymbol = symbol; return unsubscribeMessage; } void MarketDataProxyClient::ServerConnection::requestStatus (IMessageListener *listener, int64_t clientId) { sendMessage({{"command", "debug_dump_from_proxy"}}, listener, clientId, false); } void MarketDataProxyClient::ServerConnection::cancel(DataTypes::Internal type, std::string const &symbol) { auto &bySymbol = _requests[type]; const auto it = bySymbol.find(symbol); if (it == bySymbol.end()) return; cancelMessage(it->second.messageId); bySymbol.erase(it); auto const &message = createUnSubscribeMessage(type, symbol); sendMessage(message); } void MarketDataProxyClient::ServerConnection::subscribe(DataTypes::Internal type, std::string const &symbol) { RequestInfo &requestInfo = _requests[type][symbol]; if (requestInfo.messageId) return; // Already subscribed. requestInfo.type = type; requestInfo.symbol = symbol; requestInfo.owner = this; sendSubscribe(requestInfo); } void MarketDataProxyClient::ServerConnection::onMessage(RequestInfo const &request, std::string bytes) { if (!_owner->_callback) return; std::string symbol; std::string body; if (request.symbol.empty()) try { // Wildcard request size_t offset = 0; unmarshal(bytes, offset, symbol); body = unmarshalRemainder(bytes, offset); } catch (MarshallingException const &ex) { // TODO send to log file or thread monitor. return; } else { // Request for a specific symbol symbol = request.symbol; body = bytes; } _owner->_callback(request.type, symbol, body); } void MarketDataProxyClient::ServerConnection::onAbort(RequestInfo &request) { //std::cout<<"onAbort(RequestInfo &)"<