#include #include #include #include #include #include #include #include #include "AlertSqlProducer.h" #include "UserInfo.h" #include "../shared/CommandDispatcher.h" #include "../shared/DatabaseSupport.h" #include "../shared/XmlSupport.h" #include "../shared/ReplyToClient.h" #include "../shared/SocketInfo.h" #include "DatabaseThreadShared.h" #include "ShortHistory.h" #include "../shared/MiscSupport.h" #include "HistoryHandler.h" #include "../shared/LogFile.h" #include "AlertConfig.h" #include "StreamingAlertsThread.h" #include "../shared/ThreadClass.h" #include "../shared/GlobalConfigFile.h" #include "MiscThread.h" #include "Types.h" #include "UserRequestControl.h" ///////////////////////////////////////////////////////////////////// // Global ///////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////// // WorkerThreadList ///////////////////////////////////////////////////////////////////// class WorkerThreadList : NoAssign, NoCopy { private: // These are the workers available to do work. This is a fifo, not a // stack or other structure, for two reasons: // 1) If there is a problem with just one worker, we are sure to see it // even if we are testing with just a single client or in the middle of // the night when all queries are quick. // 2) When we are busy, we will send work to each database in an // approximately fixed, predetermined percentage. This allows us to // send more work to more powerful machines, and less to less powerful // machines or to machines which are being shared with other tasks. std::queue< StreamingAlertsThread * > _available; // The list of all is used for destruction. std::vector< StreamingAlertsThread * > _all; // This keeps track of the active workers by socket. If a socket is // deleted at the global level, we won't get a response from that thread, // but we know that the worker will be free. std::map< SocketInfo *, StreamingAlertsThread * > _bySocket; // This is the minimum number of items which must remain idle. If you create // 10 threads, and _reserved = 0, then you can use all 10 threads. If you // create 10 threads and _reserved = 3, then you can only use 7 threads at a // time. You will still cycle through the threads in the same order. We // tried this once before. It wasn't useful at the time, but two things // have changed. // 1) We are now allowing the administrator to change this value in the // config file and at run time. This is the easy way to try different // numbers of worker threads to fine tune the server. // 2) We now have more database servers and more front end servers. This // allows a front end server to name each database server twice, so we // use them all an equal amount, even if we only want to use 5 threads, // or some other number which is not evenly divisible. // This value will never be negative. This value will never be as large as // the total number of threads. int _reserved; public: WorkerThreadList(RequestListener *returnPath, int returnId); ~WorkerThreadList(); int available(); StreamingAlertsThread *pop(SocketInfo *socket); bool release(SocketInfo *socket); // returns false if there was no work to do. bool workInProgress(SocketInfo *socket); void debugReport(); void setReserved(int v); void setUsable(int v); int getReserved() { return _reserved; } }; void WorkerThreadList::debugReport() { TclList all; TclList available; TclList bySocket; for (int i = _all.size(); i; i--) { StreamingAlertsThread *thread = _all[i-1]; all<getName(); thread->debugReport(); } for (int i = _available.size(); i; i--) { StreamingAlertsThread *thread = _available.front(); _available.pop(); available<getName(); _available.push(thread); } for (std::map< SocketInfo *, StreamingAlertsThread * >::iterator it = _bySocket.begin(); it != _bySocket.end(); it++) { StreamingAlertsThread *thread = it->second; bySocket<getName(); TclList msg; msg<<"UserRequestControl.C" <<"debugReport()" <<"_bySocket" <getName(); LogFile::primary().sendString(msg, it->first); } TclList msg; msg<<"UserRequestControl.C" <<"debugReport()" <<"all"<pieces = explode(",", getConfigItem("worker_databases", "@worker,@worker,@worker,@worker,@worker")); for (std::vector< std::string >::iterator it = pieces.begin(); it != pieces.end(); it++) { _all.push_back(new StreamingAlertsThread(returnPath, returnId, *it)); } for (std::vector< StreamingAlertsThread * >::iterator it = _all.begin(); it != _all.end(); it++) { _available.push(*it); } setReserved(strtolDefault(getConfigItem("worker_databases_reserved"), 0)); } WorkerThreadList::~WorkerThreadList() { for (std::vector< StreamingAlertsThread * >::iterator it = _all.begin(); it != _all.end(); it++) { delete *it; } } int WorkerThreadList::available() { int result = _available.size() - _reserved; if (result >= 0) { return result; } else { return 0; } } StreamingAlertsThread *WorkerThreadList::pop(SocketInfo *socket) { assert(socket); StreamingAlertsThread *result = NULL; if (available()) { result = _available.front(); _available.pop(); StreamingAlertsThread *&active = _bySocket[socket]; assert(!active); active = result; } return result; } bool WorkerThreadList::release(SocketInfo *socket) { assert(socket); std::map< SocketInfo *, StreamingAlertsThread * >::iterator it = _bySocket.find(socket); if (it != _bySocket.end()) { _available.push(it->second); _bySocket.erase(it); return true; } return false; } bool WorkerThreadList::workInProgress(SocketInfo *socket) { // Returns true if a worker is current assigned to work on a request // for this socket. return _bySocket.count(socket); } ///////////////////////////////////////////////////////////////////// // UserRequestControl // // This class/thread manages the requests for alerts data. This is // where real-time and historical requests start. Long history // mostly takes care of itself after we initialize it. Short history // is sent to another thread to be merged back into the real-time // data. This thread closely controls the timing of the real-time // requests, and can send these requests off to a number of threads // attached to a number of database servers. ///////////////////////////////////////////////////////////////////// class UserRequestControl : private ThreadClass { private: enum MessageTypes { mtSetConfigInfo, /* Client is sending a new config request for a window. * Adds a new window, or updates an existing window. */ mtRemoveConfigInfo, /* Client deletes a window. */ mtGetAlerts, /* Request alert data. This is also used for status, sinse it is a periodic message. The client sends this right away, and the server decides when to answer. */ mtShortHistory, /* When we first get a new configuration, we might ask for some recent history. This is just one line at most, so people won't have to look at a blank screen. */ mtRealTimeData, /* From StreamingAlertsThread.C */ mtSetWorkerDatabaseReserved, mtReleaseConnections, mtStatus, /* For debugging purposes. Dumps a lot of information about * the current connection. All in XML, but made for a user to * see. This will also dump some items to the log file. */ mtQuit /* Standard destructor / end of thread call. */ }; DatabaseWithRetry _readOnlyDatabase; DatabaseWithRetry _writableDatabase; SelectableRequestQueue _incomingRequests; WorkerThreadList _workerThreadList; ShortHistoryHandler _shortHistoryHandler; LastIdCacheInfo _lastIdCache; AlertId _lastExternalId; int _needToRelease; TimeVal::Microseconds _releasePeriod; TimeVal _nextReleaseTime; static UserRequestControl *instance; UserRequestControl(); ~UserRequestControl(); static void fillStatus(XmlNode &reply, SocketInfo *socket); void checkForRelease(); protected: void threadFunction(); public: static void init(); static UserRequestControl &getInstance() { return *instance; } // We export these to the rest of the file for simplicity. Sometimes // we have to go through several function calls to get from this // class to the place where the database is used. This gets particularly // complicated with two databases. DatabaseWithRetry &getReadOnlyDatabase() { return _readOnlyDatabase; } DatabaseWithRetry &getWritableDatabase() { return _writableDatabase; } WorkerThreadList &getWorkerThreadList() { return _workerThreadList; } ShortHistoryHandler &getShortHistoryHandler() { return _shortHistoryHandler; } AlertId getLastId(bool live); void updateLastId(AlertId id); }; ///////////////////////////////////////////////////////////////////// // MonitorPerformance ///////////////////////////////////////////////////////////////////// class MonitorPerformance { private: TimeVal _nextUpdateTime; // When to print the statistics. // These are the people who are "ready" for data. TimeVal::Microseconds _timeConsumed; // Time spent servicing user. int _windows; // Number of windows given real-time data. int _users; // Number of users given real-time data. int _nullResponses; /* The number of times we sent a keep-alive without * even trying to go to the database. */ // These are all the people we pull from the queue. Not always the // same as above, and the relevant data comes from a different // function, so we track these seperately. TimeVal::Microseconds _lateTime; TimeVal::Microseconds _items; void clearStatistics(); void checkStatistics(TimeVal now); public: MonitorPerformance(); // This refers to the late time when we grab an item from the queue. void addItem(TimeVal expectedRunTime, TimeVal now); // This is the work done in the other thread. Call once per request. void recordWork(int windows, TimeVal::Microseconds timeConsumed); void addNullResponse(); }; static MonitorPerformance monitorPerformance; ///////////////////////////////////////////////////////////////////// // QueryInfo ///////////////////////////////////////////////////////////////////// static const std::string S_no_work_at_all = "no_work_at_all"; static const std::string S_no_work = "no_work"; static const std::string S_no_workers = "no_workers"; class QueryInfo : FixedMalloc { private: static std::map< SocketInfo *, QueryInfo * > queryInfoBySocket; struct GetAKey : public std::pair< TimeVal, QueryInfo * > { TimeVal const &getTimeVal() const { return first; } QueryInfo *getQueryInfo() const { return second; } GetAKey() {} GetAKey(TimeVal const &timeVal, QueryInfo *queryInfo) : std::pair< TimeVal, QueryInfo * >(timeVal, queryInfo) { } }; static std::set< GetAKey > validGetARequests; struct SingleQuery { AlertSqlProducer sqlProducer; AlertConfig::CustomSql sql; ShortHistoryRequest *shortHistoryPending; XmlNode historyResult; time_t lastDataReceived; bool historyResultValid; bool dataPresent; bool isLongHistory; bool sentToThread; SingleQuery() : shortHistoryPending(NULL), lastDataReceived(0), historyResultValid(false), dataPresent(false), isLongHistory(false), sentToThread(false) { } void dump(XmlNode &node) { sql.dump(node); node.properties["SHORT_HISTORY_PENDING"] = shortHistoryPending?"1":"0"; if (historyResultValid) { node["SHORT_HISTORY"][-1] = historyResult; } if (dataPresent) { node.properties["DATA_PRESENT"] = "1"; } } }; std::map< std::string, SingleQuery > _queryByWindowId; std::map< ShortHistoryRequest *, std::string > _windowIdByShortHistory; SocketInfo *_socket; AlertId _lastId; TimeVal _nextGetAResponse; TimeVal::Microseconds _lastGetARequest; ExternalRequest::MessageId _getAMessageId; // none() for no pending message. int _alertsUntilFree; // When this gets to 0, DEMO customers see the symbol. QueryInfo(SocketInfo *socket) : _socket(socket), _lastId(displayNoAlertData), // We will ask for this when we need it. _lastGetARequest(0), _alertsUntilFree(maxRTAlertCount) { } ~QueryInfo() { queryInfoBySocket.erase(_socket); releasePendingGetARequest(); // If the socket is goiong down, this will happen automatically. (And this // particular request will be thrown on the floor by the queue.) But if we // are just aborting this request, perphas another-user-abort, then we can // get rid of these and save some effort. UserRequestControl::getInstance().getShortHistoryHandler().abort(_socket); } GetAKey getGetAKey() { return GetAKey(_nextGetAResponse, this); } void releasePendingGetARequest() { if (_getAMessageId.present()) { nullQuery(_socket, _getAMessageId); validGetARequests.erase(getGetAKey()); _getAMessageId.clear(); } } static void nullQuery(SocketInfo *socket, ExternalRequest::MessageId messageId) { if (messageId.present()) { addToOutputQueue(socket, XmlNode().asString("API"), messageId); } } void verifyLastId(UserInfoStatus status) { // Make sure the value of _lastId is reasonable. // // There is no other function which checks the user info to get // this id. The first call to this function will get the id. The // remaining calls will exit immediately. // // We only need to read this value once. This is the initial value from // the login request. After that we update it ourselves. We don't expect // arbitrary changes from the userinfo thread. It can log someone off, // which will change the value. Otherwise, this value is provided as // part of the log in message. Note that the command handler freezes a // socket's request queue while the login message is being processed, // so we don't have to worry about the case where the data request comes // to us before the user info is ready. (That used to be a problem, but // it was fixed by freezing the queue.) // // Note that we have to read this value from the database in this thread. // Otherwise we might have the short history and the real-time data // starting from different times. if (_lastId == displayNoAlertData) { switch (status) { case sFull: case sSuspended: { // See if the user requested a value. _lastId = userInfoGetLastId(_socket); if (_lastId == displayNoAlertData) { // No value was specified. Start from the current end of // the queue. _lastId = UserRequestControl::getInstance().getLastId(true); } else { // Verify that this is a reasonable id. If it's too high, // we assume that the database was reset, and the user has an // old value. If it's too low, that will be taken care of // elsewhere. _lastId = std::min(UserRequestControl::getInstance().getLastId(true), _lastId); // Warning! If our id cache is too far out of date, we // could duplicate some data for the user. } break; } case sLimited: { // We have to look this value up for ourselves. We can't get // the value directly from userInfoGetLastId() because we need // to use the database, and data connections are associated // with a specific thread. _lastId = UserRequestControl::getInstance().getLastId(false); break; } default: { // We can't do anything in this case. (Need this to avoid a // compiler warning. } } } } void verifyLastId() { verifyLastId(userInfoGetInfo(_socket).status); } void releaseShortHistoryRequest(SingleQuery &singleQuery, bool deleteFromShortHistoryThread = true) { if (singleQuery.shortHistoryPending) { if (deleteFromShortHistoryThread) { UserRequestControl::getInstance(). getShortHistoryHandler(). abort(_socket, singleQuery.shortHistoryPending); } _windowIdByShortHistory.erase(singleQuery.shortHistoryPending); singleQuery.shortHistoryPending = NULL; } } void releaseShortHistoryRequest(std::string const &windowId, bool deleteFromShortHistoryThread = true) { SingleQuery *singleQuery = getProperty(_queryByWindowId, windowId); if (singleQuery) { releaseShortHistoryRequest(*singleQuery, deleteFromShortHistoryThread); } } void requestShortHistory(std::string windowId) { SingleQuery &singleQuery = _queryByWindowId[windowId]; releaseShortHistoryRequest(singleQuery); verifyLastId(); singleQuery.shortHistoryPending = new ShortHistoryRequest(_socket, _lastId, singleQuery.sql); UserRequestControl::getInstance().getShortHistoryHandler(). request(singleQuery.shortHistoryPending); _windowIdByShortHistory[singleQuery.shortHistoryPending] = windowId; } StreamingAlertsRequest *createStreamingAlertsRequest() { verifyLastId(); StreamingAlertsRequest *request = new StreamingAlertsRequest(_socket, _lastId); const time_t now = time(NULL); for (std::map< std::string, SingleQuery >::iterator it = _queryByWindowId.begin(); it != _queryByWindowId.end(); it++) { SingleQuery &singleQuery = it->second; if (!singleQuery.isLongHistory) { const time_t lastDataReceived = singleQuery.lastDataReceived; const int age = now - lastDataReceived; int limit; if (age < 50) // Normal streaming alerts. Only send a limited number of // alerts at a time. Otherwise someone could turn on high // speed test and bring down the system. limit = 35; else if (age < 590) // Alerts that fire at the end of a 1 minute candle or a 5 // minute candle. Show them twice as much, because a lot // of alerts happen at once. We still don't open the flood // gates for a number of reasons. For one thing, thise will // happen for a lot of users at once, and we don't want to // overload our servers. limit = 70; else // Alerts that fire at the end of a 10 minute or longer // candle. NYSE imbalances. Presumably this also catches // a lot of strategies that were waiting for the open, // although that was not the intent. limit = 140; request->addOriginalQuery(it->first, singleQuery.sqlProducer, singleQuery.sql, limit); singleQuery.sentToThread = true; } } return request; } public: static QueryInfo *find(SocketInfo *socket) { // This makes use of the userinfo. Initialize the userinfo first. QueryInfo *&result = queryInfoBySocket[socket]; if (!result) { result = new QueryInfo(socket); } return result; } static void releaseQueryInfo(SocketInfo *socket) { std::map< SocketInfo *, QueryInfo * >::iterator it = queryInfoBySocket.find(socket); if (it != queryInfoBySocket.end()) { delete it->second; } } void addGetARequest(ExternalRequest::MessageId messageId) { _lastGetARequest = TimeVal(true).asMicroseconds(); if (messageId.present()) { if (_getAMessageId.present()) { // We already have a request pending. Throw away the new one // immediately and keep the one that's queued up. nullQuery(_socket, messageId); } else { _getAMessageId = messageId; // For simplicity we only check for status (i.e. not logged in) // when we satisfy the request. We put it in the queue regardless // of the status. validGetARequests.insert(getGetAKey()); } } } void processShortHistory(ShortHistoryRequest *request) { std::map< ShortHistoryRequest *, std::string >::iterator it = _windowIdByShortHistory.find(request); if (it != _windowIdByShortHistory.end()) { std::string windowId = it->second; SingleQuery &singleQuery = _queryByWindowId[windowId]; releaseShortHistoryRequest(singleQuery, false); if (!request->result.empty()) { singleQuery.historyResult = request->result; singleQuery.historyResultValid = true; singleQuery.dataPresent = true; } } } static timeval *untilNextRequest(WorkerThreadList &workers) { std::set< GetAKey >::iterator it = validGetARequests.begin(); if (it == validGetARequests.end()) { // No work to do! Wait forever. ThreadMonitor::find().increment(S_no_work_at_all); return NULL; } else { static timeval _untilNextRequest; if (workers.available()) { // This is when the next job will be ready. _untilNextRequest = it->getTimeVal().waitTime(); ThreadMonitor::find().increment(S_no_work); } else { // There are jobs, but no workers. Wait for some external event, // like a new worker thread being ready, or a new job being // submitted. Every second run startRequests() anyway, // just to see if we need to send null responses. // // Using a fixed 1 second here seems a bit rough. Ideally we'd // use the same counter as is built into checkForNulls(). But // the sleep routine isn't that precise, anyway, so this is about // as good. (If we wake up because of time, we usually wake up // 1/2 a millisecond or so too soon and force ourselves back to // sleep, effectively making a busy wait.) _untilNextRequest.tv_sec = 1; _untilNextRequest.tv_usec = 0; ThreadMonitor::find().increment(S_no_workers); } return &_untilNextRequest; } } static void checkForNulls() { // If things are going too slowly, we will send a null response to // everyone. emptyResponseTime is the minimum amount of time a connection // has to be waiting before we send a null response. This is not very // precise, and the actual time will be between emptyResponseTime and // twice that value. // // This is required to keep the clients from timing out when we are alive // but the database is slow. Otherwise all the clients will time out // togather, and make things even worse. // // It seems that this was broken when we added mutliple database threads. // I think i've fixed it, but I haven't tested it yet. Currently no one // calls this function. static const TimeVal::Microseconds emptyResponseTime = 5000000; static TimeVal::Microseconds nextEmptyCheck = 0; const TimeVal::Microseconds now = TimeVal(true).asMicroseconds(); if (now >= nextEmptyCheck) { const TimeVal::Microseconds overdue = now - emptyResponseTime; std::set< GetAKey >::iterator it = validGetARequests.begin(); while (it != validGetARequests.end()) { // Update our iterator now, because it might be invalidated // before we get to the bottom of the loop. std::set< GetAKey >::iterator next = it; next++; QueryInfo *current = it->getQueryInfo(); if (current->_lastGetARequest < overdue) { current->releasePendingGetARequest(); monitorPerformance.addNullResponse(); } it = next; } nextEmptyCheck = now + emptyResponseTime; } } static void startRequests(WorkerThreadList &workers) { // Match jobs which are ready with workers which are ready. while (workers.available()) { std::set< GetAKey >::iterator it = validGetARequests.begin(); if (it == validGetARequests.end()) { // The timer/work queue is empty. break; } QueryInfo *queryInfo = it->getQueryInfo(); TimeVal now(true); if (it->getTimeVal() > now) { // The timer/work queue does not have anything ready to be // run yet. break; } // We found one! monitorPerformance.addItem(it->getTimeVal(), now); validGetARequests.erase(queryInfo->getGetAKey()); assert(queryInfo->_getAMessageId.present()); workers.pop(queryInfo->_socket)-> requestData(queryInfo->createStreamingAlertsRequest()); } checkForNulls(); } void finishQueries(StreamingAlertsRequest *request) { ThreadMonitor &m = ThreadMonitor::find(); std::string saveState = m.getState(); m.setState("finishQueries_01_start"); assert(_getAMessageId.present()); assert(!validGetARequests.count(getGetAKey())); UserInfoStatus status = userInfoGetInfo(_socket).status; if (status != sNone) { m.setState("finishQueries_02_collect"); const time_t now = time(NULL); for (std::map< std::string, SingleQuery >::iterator it = _queryByWindowId.begin(); it != _queryByWindowId.end(); it++) { SingleQuery &singleQuery = it->second; if (!singleQuery.isLongHistory) { std::string windowId = it->first; bool sqlIsNew = false; if (singleQuery.sentToThread) request->getQuery(windowId, singleQuery.sqlProducer, singleQuery.sql, sqlIsNew); bool newDataPresent = request->addFinalQuery(windowId, singleQuery.sentToThread); if (newDataPresent) singleQuery.lastDataReceived = now; singleQuery.dataPresent = singleQuery.dataPresent || newDataPresent; if (sqlIsNew && !singleQuery.dataPresent) { // This is our first opportunity to request the short // history because we just now got the SQL for it. But, // of course, don't request it if we already have data. requestShortHistory(windowId); } if ((!newDataPresent) && singleQuery.historyResultValid) { request->addData(windowId, singleQuery.historyResult); } if (singleQuery.historyResultValid) { singleQuery.historyResultValid = false; singleQuery.historyResult.clear(); } else if (singleQuery.dataPresent) { releaseShortHistoryRequest(singleQuery); } } } m.setState("finishQueries_03_trim"); request->trimResponse(); if (status == sLimited) { request->hideSomeData(_alertsUntilFree); } m.setState("finishQueries_05_send"); DeferedXmlReply::send(_socket, request->getResponse(), _getAMessageId); } _getAMessageId.clear(); m.setState("finishQueries_06_next"); _lastId = request->getLastId(); TimeVal::Microseconds addTime; // This is the "normal" amount of time we want to wait for the next query. if (status == sNone) { addTime = 10000000; } else if (status == sLimited) { addTime = 3000000; } else if (request->getMaxedOut()) { addTime = 1500000; } else { addTime = 1000000; } // Then we penalize people with slow requests. However long your request // took, we add that much time again. (We've already added that time once, // implicitly, because we are adding this time to the end of the query, // not to when the query started, or should have started.) The (explicit // part of the) delay is never more than double the "normal" delay. A // delay that large would probably not be caused by the user. addTime += std::min(addTime, request->getQueryTime()); // The following limit is used only for testing. This puts an absolute // maximum, despite any and all other logic. static TimeVal::Microseconds maxGetADelay = strtollDefault(getConfigItem("max_geta_delay"), std::numeric_limits::max()); addTime = std::min(addTime, maxGetADelay); // The next request will be now plus the delay we computed. _nextGetAResponse.currentTime(); _nextGetAResponse.addMicroseconds(addTime); monitorPerformance.recordWork(request->getWindowCount(), request->getQueryTime()); m.setState(saveState); } void removeWindow(ExternalRequest *request) { std::string windowId = request->getProperty("window_id"); releaseShortHistoryRequest(windowId); _queryByWindowId.erase(windowId); HistoryHandler::getInstance().cancelRequest(_socket, windowId); if (request->getResponseMessageId().present()) { addToOutputQueue(_socket, XmlNode().asString("API"), request->getResponseMessageId()); } } void addWindow(ExternalRequest *request) { // Create a new window, or replace an existing window. std::string windowId = request->getProperty("window_id"); std::string longForm = request->getProperty("long_form"); bool skipHistory = request->getProperty("skip_history") == "1"; SingleQuery &singleQuery = _queryByWindowId[windowId]; releaseShortHistoryRequest(singleQuery); HistoryHandler::getInstance().cancelRequest(_socket, windowId); if (HistoryHandler::getInstance().createHistoryRequest(request)) { singleQuery.isLongHistory = true; } else { singleQuery.sqlProducer.load(request); singleQuery.isLongHistory = false; singleQuery.lastDataReceived = time(NULL); } singleQuery.dataPresent = skipHistory; singleQuery.historyResultValid = false; singleQuery.sentToThread = false; singleQuery.historyResult.clear(); } static int getSocketCount() { // This is the number of active stockets. return queryInfoBySocket.size(); } XmlNode xmlDump() { XmlNode result; result.properties["LAST_ID"] = itoa(_lastId); for (std::map< std::string, SingleQuery >::iterator it = _queryByWindowId.begin(); it != _queryByWindowId.end(); it++) { XmlNode &node = result["QUERY"][-1]; node.properties["WINDOW_ID"] = it->first; it->second.dump(node); } return result; } static bool releaseOne() { if (validGetARequests.empty()) { return false; } DeleteSocketThread::deleteSocket(validGetARequests.rbegin()->second->_socket); return true; } }; std::map< SocketInfo *, QueryInfo * > QueryInfo::queryInfoBySocket; std::set< QueryInfo::GetAKey > QueryInfo::validGetARequests; ///////////////////////////////////////////////////////////////////// // UserRequestControl ///////////////////////////////////////////////////////////////////// void UserRequestControl::init() { assert(!instance); instance = new UserRequestControl(); } UserRequestControl::UserRequestControl() : ThreadClass("UserRequestControl"), _readOnlyDatabase(true, "UserRequestControl Read-Only"), _writableDatabase(false, "UserRequestControl Writable"), _incomingRequests("UserRequestControl"), _workerThreadList(&_incomingRequests, mtRealTimeData), _shortHistoryHandler(&_incomingRequests, mtShortHistory), _lastExternalId(0), _needToRelease(0), _releasePeriod(50000) { startThread(); CommandDispatcher *c = CommandDispatcher::getInstance(); c->listenForCommand("status", &_incomingRequests, mtStatus); c->listenForCommand("get_alerts", &_incomingRequests, mtGetAlerts); c->listenForCommand("set_config_info", &_incomingRequests, mtSetConfigInfo); c->listenForCommand("remove_config_info", &_incomingRequests, mtRemoveConfigInfo); c->listenForCommand("SET_worker_databases_reserved", &_incomingRequests, mtSetWorkerDatabaseReserved); c->listenForCommand("release_connections", &_incomingRequests, mtReleaseConnections); } UserRequestControl::~UserRequestControl() { Request *r = new Request(NULL); r->callbackId = mtQuit; _incomingRequests.newRequest(r); waitForThread(); } AlertId UserRequestControl::getLastId(bool live) { // We use our own cache to have some control that things don't get too old, // but we also look at all the values from other threads because it could // cause problems if we were behind. The threads that look for data don't // mind, but this main thread will ignore a request from a user to start // at an id if it is higher than we think the highest id is. We look at the // writable database because it might have a higher value than the read- // only databases because it is the replication master. // // Adding live vs. 20 minute delayed data makes this more complicated. // It makes me wonder if we should cache this at all. The next step, that // we've already discussed, is where anyone could pick their own delay. return std::max(getLastAlertId(_writableDatabase, _lastIdCache, live), _lastExternalId); } void UserRequestControl::updateLastId(AlertId id) { _lastExternalId = std::max(_lastExternalId, id); } static const std::string S_mtStatus="mtStatus"; static const std::string S_mtEcho="mtEcho"; static const std::string S_mtSetConfigInfo="mtSetConfigInfo"; static const std::string S_mtRemoveConfigInfo="mtRemoveConfigInfo"; static const std::string S_mtDisconnect="mtDisconnect"; static const std::string S_mtGetAlerts="mtGetAlerts"; static const std::string S_mtEditConfig="mtEditConfig"; static const std::string S_mtEditConfigNew="mtEditConfigNew"; static const std::string S_mtAllAlertTypes="mtAllAlertTypes"; static const std::string S_mtShortHistory="mtShortHistory"; static const std::string S_mtRealTimeData="mtRealTimeData"; static const std::string S_DeleteSocketThread="DeleteSocketThread::callbackId"; static const std::string S_mtQuit="mtQuit"; // Server administration. You might have more users on one server and fewer // on another, you might want to balance that out. Or you might be planning to // take a server down, and you want to do that with as few side effects as // possible. // // Using the mtReleaseConnections command an administrator can tell the server // how many connections to release. And he can specify the rate at which to // disconnect the clients. This prevents the other servers from getting // overloaded by a flood of new login requests. inline void UserRequestControl::checkForRelease() { if (_needToRelease && (_nextReleaseTime <= TimeVal(true))) { _nextReleaseTime.addMicroseconds(_releasePeriod); QueryInfo::releaseOne(); _needToRelease--; } } void UserRequestControl::threadFunction() { ThreadMonitor &m = ThreadMonitor::find(); std::string defaultState = m.getState(); while (true) { _incomingRequests.waitForRequest(QueryInfo::untilNextRequest(_workerThreadList)); _incomingRequests.resetWaitHandle(); while (Request *current = _incomingRequests.getRequest()) { switch (current->callbackId) { case mtStatus: { //m.setState(S_mtStatus); //m.increment(S_mtStatus); ExternalRequest *request = dynamic_cast(current); SocketInfo *socket = request->getSocketInfo(); XmlNode reply; fillStatus(reply, socket); addToOutputQueue(socket, reply.asString("API"), request->getResponseMessageId()); _workerThreadList.debugReport(); break; } case mtSetWorkerDatabaseReserved: { ExternalRequest *externalRequest = dynamic_cast(current); SocketInfo *socket = externalRequest->getSocketInfo(); TclList msg; msg<<"mtSetWorkerDatabaseReserved" <<_workerThreadList.getReserved(); std::string requestStr = externalRequest->getProperty("new"); msg<= 0) { _workerThreadList.setReserved(requested); msg<<_workerThreadList.getReserved(); } else { msg<<"invalid"; } LogFile::primary().sendString(msg, socket); break; } case mtReleaseConnections: { ExternalRequest *externalRequest = dynamic_cast(current); SocketInfo *socket = externalRequest->getSocketInfo(); const double rate = strtodDefault(externalRequest->getProperty("rate"), -1); if ((rate >= 1) && (rate <= 1000)) { double period = 1 / rate; // seconds / connections period *= 1000000; // microseconds / connection _releasePeriod = (TimeVal::Microseconds)period; } _needToRelease += strtolDefault(externalRequest->getProperty("count"), 0); _needToRelease = std::max(0, _needToRelease); XmlNode reply; reply.properties["COUNT"] = ntoa(_needToRelease); reply.properties["PERIOD"] = ntoa(_releasePeriod); reply.properties["RATE"] = ntoa(1000000.0 / _releasePeriod); addToOutputQueue(socket, reply.asString("API"), externalRequest->getResponseMessageId()); TclList msg; msg<<"mtReleaseConnections" <<"count" <<_needToRelease <<"period" <<_releasePeriod; LogFile::primary().sendString(msg, socket); _nextReleaseTime.currentTime(); break; } case mtSetConfigInfo: { //m.setState(S_mtSetConfigInfo); //m.increment(S_mtSetConfigInfo); ExternalRequest *externalRequest = dynamic_cast(current); SocketInfo *socket = externalRequest->getSocketInfo(); QueryInfo::find(socket)->addWindow(externalRequest); break; } case mtRemoveConfigInfo: { //m.setState(S_mtRemoveConfigInfo); //m.increment(S_mtRemoveConfigInfo); ExternalRequest *externalRequest = dynamic_cast(current); SocketInfo *socket = externalRequest->getSocketInfo(); QueryInfo::find(socket)->removeWindow(externalRequest); break; } case mtGetAlerts: { //m.setState(S_mtGetAlerts); //m.increment(S_mtGetAlerts); ExternalRequest *externalRequest = dynamic_cast(current); SocketInfo *socket = externalRequest->getSocketInfo(); ExternalRequest::MessageId messageId = externalRequest->getResponseMessageId(); QueryInfo::find(socket)->addGetARequest(messageId); break; } case mtShortHistory: { //m.setState(S_mtShortHistory); //m.increment(S_mtShortHistory); ShortHistoryRequest *request = dynamic_cast(current); SocketInfo *socket = request->getSocketInfo(); QueryInfo::find(socket)->processShortHistory(request); break; } case mtRealTimeData: { //m.setState(S_mtRealTimeData); //m.increment(S_mtRealTimeData); StreamingAlertsRequest *request = dynamic_cast(current); SocketInfo *socket = request->getSocketInfo(); // Now that the thread is free, check immediately to see if we // have another job for it. bool released =_workerThreadList.release(socket); QueryInfo::startRequests(_workerThreadList); if (!released) { // This should never happen. It could be an assertion // failure, where I'd have to restart the software. // But maybe I could learn more by inspecting the logs. LogFile::primary().quoteAndSend ("UserRequestControl.C", "unable to release thread", socket); } QueryInfo::find(socket)->finishQueries(request); break; } case DeleteSocketThread::callbackId: { //m.setState(S_DeleteSocketThread); //m.increment(S_DeleteSocketThread); SocketInfo *socket = current->getSocketInfo(); QueryInfo::releaseQueryInfo(socket); bool released = _workerThreadList.release(socket); if (released) { m.increment("delete/release"); } break; } case mtQuit: { //m.setState(S_mtQuit); //m.increment(S_mtQuit); delete current; return; } } //m.setState(defaultState); delete current; } UserRequestControl::checkForRelease(); QueryInfo::startRequests(_workerThreadList); } } void UserRequestControl::fillStatus(XmlNode &reply, SocketInfo *socket) { reply["SOCKET"].properties["REMOTE_ADDR"] = socket->remoteAddr(); reply["SOCKET"].properties["SERIAL_NUMBER"] = ntoa(SocketInfo::getSerialNumber(socket)); //UserInfo::dump(socket, reply["USER_INFO"]); reply["QUERY_INFO"] = QueryInfo::find(socket)->xmlDump(); reply["HOST"].properties["NAME"] = getShortHostName(); } UserRequestControl *UserRequestControl::instance = NULL; ///////////////////////////////////////////////////////////////////// // MonitorPerformance ///////////////////////////////////////////////////////////////////// void MonitorPerformance::clearStatistics() { _timeConsumed = 0; _windows = 0; _users = 0; _lateTime = 0; _items = 0; _nullResponses = 0; } void MonitorPerformance::checkStatistics(TimeVal now) { if (now < _nextUpdateTime) { return; } if (_timeConsumed||_windows||_users||_items||_lateTime) { /* Usage is % of real time. Can be over 100% since we are sleeping, * and multiple threads can be sleeping at the same time. It's * perfectly reasonable to be over 100% since we can connect to * multiple servers, each with multiple CPUs. In addition to that * ideal case, multiple threads can be asleep on these servers, which * can alo be a good thing in limited quantities. */ double usage = _timeConsumed / 600000.0; char usageStr[24]; sprintf(usageStr, "%.6g%%", usage); TclList msg; msg<<"UserRequestControl.C" <<"MonitorPerformance" <<"active_sockets" < 0) { msg<<"avg late time" <<_lateTime / _items; } if (_nullResponses > 0) { msg<<"_nullResponses" <<_nullResponses; } LogFile::primary().sendString(msg); clearStatistics(); } // We usually make one printout every 60 seconds. We usually // keep adding 60 seconds to the original time, so that we won't // get any drift. However, if we are not called at all for over // 60 seconds, then we restart fresh from now. _nextUpdateTime.addSeconds(60); if (_nextUpdateTime < now) { _nextUpdateTime = now; _nextUpdateTime.addSeconds(60); } } MonitorPerformance::MonitorPerformance() : _nextUpdateTime(false) { clearStatistics(); } void MonitorPerformance::addItem(TimeVal expectedRunTime, TimeVal now) { checkStatistics(now); if (expectedRunTime) { // Sometimes, it appears, that the expectedRunTime is 0. That presumably // is a way to request that something run immediately. I suppose that // we could get better statistics by requesting the current time when // the request is created, rather than 0, although the meaning would // not be exactly the same. _lateTime += now.asMicroseconds() - expectedRunTime.asMicroseconds(); _items++; } } void MonitorPerformance::recordWork(int windows, TimeVal::Microseconds timeConsumed) { _users++; _timeConsumed += timeConsumed; _windows += windows; } inline void MonitorPerformance::addNullResponse() { _nullResponses++; } ///////////////////////////////////////////////////////////////////// // Main ///////////////////////////////////////////////////////////////////// bool initUserRequestControl() { HistoryHandler::initInstance(); // Long history. UserRequestControl::init(); return true; }