#include "../shared/CommandDispatcher.h" #include "../shared/ContainerThread.h" #include "../shared/GlobalConfigFile.h" #include "../shared/ReplyToClient.h" #include "../shared/NewWorkerCluster.h" #include "../shared/ThreadSafeRefCount.h" #include "CandleDataNode.h" #include "CandleServer.h" ///////////////////////////////////////////////////////////////////// // CandleServerMainThread ///////////////////////////////////////////////////////////////////// class CandleServerMainThread : ForeverThreadUser { protected: virtual void handleRequestInThread(Request *original); private: enum { mtTodaysCandles, mtIntradayCandle, mtDailyCandle }; NewWorkerCluster _cluster; DataNodeThread *const _dataNodeThread; typedef std::vector< time_t > Times; static Times parseTimes(std::string const &asString); // Some code shared between TodaysCandlesRequest and CandleRequest. class DataRequest { protected: DataNodeThread *const _dataNodeThread; SocketInfo *const _socket; const ExternalRequest::MessageId _clientMessageId; CandleDataNode *const _candleDataNode; DataRequest(DataNodeThread *dataNodeThread, ExternalRequest *source, bool intraday); virtual void doWork() const =0; public: // Various Request objects will hold this data at various times. We don't // create special classes for those Request objects, so we don't have any // control over their destructors. Instead, we submit a lambda expression // and some library code does the rest. Making this object reference // counted will ensure that it's available in the lambda expressions when // it's needed, and it's deleted when it's not needed any more. (The // thread safe part may be overkill, but it doesn't hurt anything.) typedef TSRefCount< DataRequest > Ref; // Send this request to the data node thread, back to the dispatcher // thread, into the queue, and finally to a worker thread. In the worker // thread we'll call doWork(). static void send(Ref ref, NewWorkerCluster *cluster, IContainerThread *thread); virtual ~DataRequest(); }; // All candles from today. Typically a server will keep all candles from // today in memory, and keep building them from individual prints. Older // candles will go to the database. This this requests allows a server to // bootstrap itself from another server which has been listening all day. class TodaysCandlesRequest : public DataRequest { protected: virtual void doWork() const; public: TodaysCandlesRequest(DataNodeThread *dataNodeThread, ExternalRequest *source); }; class CandleRequest : public DataRequest { private: const std::string _start; const std::string _end; const bool _intraday; protected: virtual void doWork() const; public: CandleRequest(DataNodeThread *dataNodeThread, ExternalRequest *source, bool intraday); }; public: CandleServerMainThread(int threadCount, DataNodeThread *dataNodeThread); ~CandleServerMainThread(); }; ///////////////////////////////////////////////////////////////////// // CandleServerMainThread::DataRequest ///////////////////////////////////////////////////////////////////// CandleServerMainThread::DataRequest::DataRequest (DataNodeThread *dataNodeThread, ExternalRequest *source, bool intraday) : _dataNodeThread(dataNodeThread), _socket(source->getSocketInfo()), _clientMessageId(source->getResponseMessageId()), _candleDataNode(CandleDataNode::find(source->getProperty("symbol"), intraday)) { } CandleServerMainThread::DataRequest::~DataRequest() { // Put this in the destructor so we don't miss it, even if we get // deleted early. That could happen if the socket gets disconnected. _candleDataNode->decrementReferenceCount(); } // We handle several types of data requests. They all follow the same // circuitous path. // 1) In the main thread, read the relevant details from the external request. // 2) In the main thread, request the data that we need. // 3) Go to the data node thread, to make sure the data request was turned // on. // 4) Go back to the main thread to be queued up for a worker thread. // 5) Go to a worker thread to read the data and send a response to the // client. // 6) Go back to the main thread, to tell that thread that the worker thread // is available again. // The constructor for CandleServerMainThread::DataRequest handles 1 & 2. This // function handles the rest. void CandleServerMainThread::DataRequest::send(Ref ref, NewWorkerCluster *cluster, IContainerThread *thread) { ref->_dataNodeThread-> submit(ref->_socket, [=]() { thread->addLambdaToQueue([=]() { cluster->addJob1([=]() { ref->doWork(); }, ref->_socket); }, ref->_socket); }); } ///////////////////////////////////////////////////////////////////// // CandleServerMainThread::TodaysCandlesRequest ///////////////////////////////////////////////////////////////////// CandleServerMainThread::TodaysCandlesRequest::TodaysCandlesRequest (DataNodeThread *dataNodeThread, ExternalRequest *source) : DataRequest(dataNodeThread, source, true) { } void CandleServerMainThread::TodaysCandlesRequest::doWork() const { const std::string result = _candleDataNode->marshalTodaysCandles(); addToOutputQueue(_socket, result, _clientMessageId); } ///////////////////////////////////////////////////////////////////// // CandleServerMainThread::CandleRequest ///////////////////////////////////////////////////////////////////// inline time_t importTime(std::string const &asString) { // ../ax_alert_server/FormatTime.h has a different way to do the conversion. // Probably a moot point. return strtollDefault(asString, 0); } CandleServerMainThread::CandleRequest::CandleRequest (DataNodeThread *dataNodeThread, ExternalRequest *source, bool intraday) : DataRequest(dataNodeThread, source, intraday), _start(source->getProperty("start", source->getProperty("date"))), _end(source->getProperty("end", source->getProperty("date"))), _intraday(intraday) { } void CandleServerMainThread::CandleRequest::doWork() const { std::string response; auto startTimes = parseTimes(_start); auto endTimes = parseTimes(_end); if (startTimes.empty() || (startTimes.size() != endTimes.size())) ThreadMonitor::find().increment("IntradayCandleRequest_ERROR"); else { AllRowTimes requests; requests.reserve(startTimes.size()); for (auto startIt = startTimes.begin(), endIt = endTimes.begin(); startIt != startTimes.end(); startIt++, endIt++) requests.emplace_back(*startIt, *endIt); SingleCandle::ByStartTime results; _candleDataNode->threadSafeGet(requests, results); for (auto it = results.cbegin(); it != results.end(); it++) it->second.marshal(it->first, response); } addToOutputQueue(_socket, response, _clientMessageId); /* TclList debug; debug<callbackId) { case mtTodaysCandles: { tm.increment("mtTodaysCandles"); tm.setState("mtTodaysCandles"); ExternalRequest *request = dynamic_cast< ExternalRequest * >(original); DataRequest::send(new TodaysCandlesRequest(_dataNodeThread, request), &_cluster, getContainer()); break; } case mtIntradayCandle: { tm.increment("mtIntradayCandle"); tm.setState("mtIntradayCandle"); ExternalRequest *request = dynamic_cast< ExternalRequest * >(original); DataRequest::send(new CandleRequest(_dataNodeThread, request, true), &_cluster, getContainer()); break; } case mtDailyCandle: { tm.increment("mtDailyCandle"); tm.setState("mtDailyCandle"); ExternalRequest *request = dynamic_cast< ExternalRequest * >(original); DataRequest::send(new CandleRequest(_dataNodeThread, request, false), &_cluster, getContainer()); break; } } } CandleServerMainThread::CandleServerMainThread (int threadCount, DataNodeThread *dataNodeThread) : ForeverThreadUser(IContainerThread::create("Candle Server Main Thread")), _cluster(getContainer(), "Candle Server"), _dataNodeThread(dataNodeThread) { _cluster.createWorkers(threadCount); CommandDispatcher *const cd = CommandDispatcher::getInstance(); cd->listenForCommand("get_todays_candles", this, mtTodaysCandles); cd->listenForCommand("get_intraday_candle", this, mtIntradayCandle); cd->listenForCommand("get_daily_candle", this, mtDailyCandle); start(); } CandleServerMainThread::~CandleServerMainThread() { assert(false && "Not Implemented!"); } ///////////////////////////////////////////////////////////////////// // Global ///////////////////////////////////////////////////////////////////// void initCandleServer(DataNodeThread *dataNodeThread) { const int threadCount = strtolDefault(getConfigItem("candle_server_threads"), 0); if (threadCount > 0) new CandleServerMainThread(threadCount, dataNodeThread); }