#include #include #include "AlertSqlProducer.h" #include "DatabaseThreadShared.h" #include "../shared/LogFile.h" #include "../shared/CommandDispatcher.h" #include "../shared/ReplyToClient.h" #include "UserInfo.h" #include "Types.h" #include "../shared/GlobalConfigFile.h" #include "FormatTime.h" #include "HistoryHandler.h" ///////////////////////////////////////////////////////////////////// // // The biggest problem with long history is that it can take a lot // of resources. The fundamental work of requesting the alerts // is the same as for the realtime or short history functions. // There are common library functions for setting up a query to find // a specific maximum number of alerts within a specific range. To // keep from locking up the database, we break up the request so that // we are looking at a small range of alerts in the database, and we // never recieve more than a small number of alerts. We repeat that // process until we are done. The default request from the client // will have to be split into a minimum of 4 pieces, although a long // request can be split into many more pieces. // // A quick request could take far less than a second, and a long // request could take several minutes. We don't want someone to have // to wait for a different request which could take arbitrarily long. // So we use a FairRequestQueue to hold all of the requests. We // continuously remove a request from the queue, do a little work, // update the request so we know where to pick up from next time, // then throw it back into the queue. This looks like a standard // round-robin timeslice scheduler. // // Note: We no longer own the FairRequestQueue directly. Now we // use a WorkerCluster, which includes several FairRequestQueue objects. // The WorkerCluster splits the work up to cover multiple databases, // to maximize the ability to cache data. This was originally // developed for the OddsMaker, but the cache works best when this // unit uses the databases the same way as the OddsMaker. ///////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////// // ExternalHistoryRequest ///////////////////////////////////////////////////////////////////// class ExternalHistoryRequest : public Request { friend class HistoryRequest; private: enum TimeMode { tmDefault, tmId, tmTime }; TimeMode _startMode; TimeMode _endMode; union { time_t _startTime; AlertId _startId; }; time_t _endTime; int _maxCount; AlertSqlProducer _sqlProducer; public: ExternalHistoryRequest(ExternalRequest *request); ~ExternalHistoryRequest(); static bool belongsToUs(ExternalRequest *request); }; ExternalHistoryRequest::~ExternalHistoryRequest() { if (_sqlProducer.ready()) _sqlProducer.abandon(); } bool ExternalHistoryRequest::belongsToUs(ExternalRequest *request) { return request->getProperty("long_history") == "1"; } ExternalHistoryRequest::ExternalHistoryRequest(ExternalRequest *request) : Request(request->getSocketInfo()) { assert(belongsToUs(request)); _sqlProducer.load(request); _startId = strtolDefault(request->getProperty("long_history_start_id"), 0); if (_startId > 0) { _startMode = tmId; } else { _startTime = importTime(request->getProperty("long_history_start_time")); if (_startTime > 0) { _startMode = tmTime; } else { _startMode = tmDefault; } } _endTime = importTime(request->getProperty("long_history_end_time")); if (_endTime > 0) { _endMode = tmTime; } else { _endMode = tmDefault; } _maxCount = strtolDefault(request->getProperty("long_history_max_count"), 1000); } ///////////////////////////////////////////////////////////////////// // HistoryRequest ///////////////////////////////////////////////////////////////////// class HistoryRequest : public Request { const AlertConfig::CustomSql _sql; AlertId _startId; AlertId _endId; int _maxCount; void forceDone() { _maxCount = 0; } ShardList _shardList; ShardList::Shard const *_currentShard; // Null for the current alerts. void updateCurrentShard(); AlertId timeToAlertId(time_t time, DatabaseWithRetry &database); // We always report when we have data or when we are done. But we also have // periodic status messages at other times. The status message tells the // client how far we've gotten. That's useful in case of a disconnect. The // client doesn't have to go back to the beginning or the last data it // received. Instead it can start at the last place the server inspected. // So you don't lose anything on a disconnect. The question is how often // to send those status messages. Originally we sent one per timeslice, but // that often sent way too many messages. Now we report about once a second. // That means you could lose up to one second each time you restart, and on // average you expect to lose half a second. int64_t _nextReportTime; void updateNextReportTime(); public: HistoryRequest(ExternalHistoryRequest *request, WorkerThread *resources); const AlertConfig::CustomSql &getSql() const { return _sql; } bool done() const { return _maxCount <= 0; } void getHistory(ExternalRequest::MessageId messageId, std::string windowId, WorkerThread *resources); DayNumber getCurrentDay() const; }; DayNumber HistoryRequest::getCurrentDay() const { if (_currentShard) return _currentShard->day; else return DAY_NUMBER_UNKNOWN; } void HistoryRequest::updateCurrentShard() { // Call this after changing _startId. Notice that we always change _startId // at the end of a timeslice, so getCurrentDay will be correct when it's time // to pick a database server for the next time slice. _currentShard = _shardList.findAlertOrBefore(_startId); } // Do not copy this! New code should use dateToMysql() found in // ../shared/MiscSQL.[Ch] static std::string mysqlDate(time_t time) { if (time <= 0) return "0000-00-00"; struct tm brokenDown; localtime_r(&time, &brokenDown); char result[40]; sprintf(result, "%04d-%02d-%02d", brokenDown.tm_year+1900, brokenDown.tm_mon+1, brokenDown.tm_mday); return result; } AlertId HistoryRequest::timeToAlertId(time_t time, DatabaseWithRetry &database) { std::string sql = "SELECT id FROM "; ShardList::Shard const *const shard = _shardList.findDayOrGreater(mysqlDate(time)); AlertId defaultId; if (shard) { sql += shard->table; // If the time is greater than any time in this table, pick the highest // id in the table. So at least the day will be correct. It is tempting // to add 1 to the id, so a time after the last alert today will find the // first alert tomorrow. That would be consistent with the way we did // things before there were shards. But that might pick an id which // is not part of any shard, and that might cause trouble elsewhere in // this code. defaultId = shard->maxId; } else { sql += "alerts"; // -1 is used to find the greatest id anywhere. This is the default // if someone does not specify a begin or end time. defaultId = -1; } sql += " WHERE timestamp > FROM_UNIXTIME("; sql += itoa(time); sql += ") ORDER BY timestamp LIMIT 1"; MysqlResultRef result = database.tryQueryUntilSuccess(sql); return result->getIntegerField(0, defaultId); } HistoryRequest::HistoryRequest(ExternalHistoryRequest *request, WorkerThread *resources) : Request(request->getSocketInfo()), _sql(request->_sqlProducer.init(*resources->getReadOnlyDatabase())), _startId(0), _maxCount(request->_maxCount), _currentShard(NULL) { // Don't send any optional reports right away. Pause a little. updateNextReportTime(); // We have to convert the request so that it only uses alert ids. // This can only be done in the correct thread because we need to // access the database. TclList message; message<<__FILE__<<__LINE__<<__FUNCTION__ <<"_maxCount"<<_maxCount; _maxCount = std::min(3000, request->_maxCount); // Occasionally we get a request for a negative number! I don't // know why. But that causes but problems down the road. DatabaseWithRetry &database = *resources->getReadOnlyDatabase(); _shardList.load(database); const AlertId maxId = database.tryQueryUntilSuccess("SELECT MAX(id) FROM alerts") ->getIntegerField(0, std::numeric_limits< AlertId >::min()); const AlertId minId = _shardList.getMinId(); switch (request->_startMode) { case ExternalHistoryRequest::tmId: message<<"requested start id" <_startId; _startId = request->_startId; break; case ExternalHistoryRequest::tmTime: { message<<"requested start time" <_startTime); _startId = timeToAlertId(request->_startTime, database); break; } default: { message<<"requested start default"; _startId = -1; break; } } if ((_startId < minId) || (_startId > maxId)) { _startId = maxId; } message<<"effective start id"<<_startId; switch (request->_endMode) { case ExternalHistoryRequest::tmTime: { message<<"requested end time" <_endTime); _endId = timeToAlertId(request->_endTime, database); break; } default: { _endId = -1; } } message<<"proposed end id"<<_endId; if ((_endId < minId) || (_endId > maxId)) { _endId = minId; } message<<"effective end id"<<_endId; updateCurrentShard(); message<<"starting day number"<getSocketInfo()); } void HistoryRequest::updateNextReportTime() { // For simplicity wait one second for the next report. One second is // somewhat arbitrary. Perhaps this should be partially random. I'm finally // adding this code because of a huge request when we first start with a // big layout. In that case we have a lot of history requests all starting // at once. It might be nice to spread them out some. _nextReportTime = getMicroTime() + 1000000; } void HistoryRequest::getHistory(ExternalRequest::MessageId messageId, std::string windowId, WorkerThread *resources) { if (messageId.isEmpty()) { forceDone(); } else { // The is the newest alert, the alert with the highest id that we are // willing to accept. const AlertId newestAlert = _startId; // We will only accept alerts newer than this. AlertId oldestAlert = std::max(newestAlert - 10000, _endId); std::string tableName; if (_currentShard) { // From a historical table. tableName = _currentShard->table; // Remember that we will stop just before this value. That's why // I'm substracting 1. oldestAlert = std::max(oldestAlert, _currentShard->minId - 1); } else { // From the live data. tableName = "alerts"; oldestAlert = std::max(oldestAlert, _shardList.getMaxId()); } XmlNode message; XmlNode &group = message["HISTORY"]; group.properties["WINDOW"] = windowId; group.properties["TYPE"] = "alerts"; // We used to send a message to the user every time we got a time-slice. // That often led to lots of uninteresting messages. Now we only send // a message if it's interesting. Finding 1 or more alerts is one way to // guarantee that the message is interesting and it is sent to the user. bool alertsFound = false; if (newestAlert <= oldestAlert) { // The range is empty. group.properties["END"] = "range"; forceDone(); } else { // This is the most alerts that we will accept from the database. // This may be limited by the number that the client requested, or by // the total number we are allowed to put in one packet. const int maxCount = std::min(_maxCount, 250); // maxCount should always be larger than 0. If it was not, we // should not have gotten to the getHistory() function in the // first place. // We had a problem where maxCount was negative every once in a // while. This caused us to put a negative value in the LIMIT // clause. And that caused us to have a syntax error, and we could // never recover from that. I think the problem is solved. But // maybe we should double-check here, just to be safe. const std::string sql = _sql.getCommon(oldestAlert, newestAlert, maxCount, tableName); _shardList.sendShardToLog(newestAlert); std::string lastIdFound; for (MysqlResultRef result = resources->getReadOnlyDatabase()//range.position(newestAlert)) ->tryQueryUntilSuccess(sql); result->rowIsValid(); result->nextRow()) { _sql.copyAlert(group[-1], result, ahtHistory); lastIdFound = result->getStringField("id"); alertsFound = true; } const int alertCount = group.orderedChildren.size(); if (alertCount >= maxCount) { // The database stopped looking for results as soon as it gave // the last one to us. Next time, start with the next id. There // should never be a problem decoding the id, but if there is, // then we just end the history session by moving the history // back to 0. (If maxCount is 0, as it was before a previous // bug fix, then we will use this default. That caused the // request to terminate early, which was a graceful way to // handle the bug.) _startId = strtolDefault(lastIdFound, 1) - 1; } else { // The database stopped looking for results because it hit our // oldestAlert criteria. Start from there next time. _startId = oldestAlert; } updateCurrentShard(); // Since we're sending this to the client in pieces, it is possible // that the client will have to reconnect in the middle of the // request. In that case the client can give us back this value and // we will start from there. group.properties["RESTART"] = ntoa(_startId); // Update our counters to make sure we don't send any more data than // the client asked for or is allowed to have. _maxCount -= alertCount; } if (alertsFound /* This is the main reason to talk to the client. Send * the alerts that we've found. */ || done() /* Although there is a special "finish" message, that doesn't * contain all the information the client needs. For * historical reasons this message contains the next alert * id to start from or the special "range" message to say * we've found everything. */ || (getMicroTime() > _nextReportTime)) /* Periodically send a status message. */ { addToOutputQueue(getSocketInfo(), message.asString("API"), messageId); updateNextReportTime(); } } } ///////////////////////////////////////////////////////////////////// // CombinedHistoryRequest ///////////////////////////////////////////////////////////////////// class CombinedHistoryRequest : public WorkerThreadRequest { // The strange layout of this object reflects it's history. Originally // all of the work was done in the main thread, not in worker threads. // Originally the work from ExternalHistoryRequest was done as soon as we // first saw the request in the incoming queue and the work in HistoryRequest // was defered until the incoming queue was empty. private: bool _needToTerminate; ExternalHistoryRequest *_initial; HistoryRequest *_main; const std::string _windowId; ExternalRequest::MessageId _messageId; void sendDemoMessage() const; void sendTerminateMessage() const; public: CombinedHistoryRequest(ExternalRequest *request); ~CombinedHistoryRequest(); void doWork(WorkerThread *resources); const std::string &getWindowId() const { return _windowId; } const bool needToTerminate() const { return _needToTerminate; } void setMessageId(ExternalRequest::MessageId messageId) { _messageId = messageId; } }; CombinedHistoryRequest::CombinedHistoryRequest(ExternalRequest *request) : WorkerThreadRequest(request->getSocketInfo()), _needToTerminate(false), _initial(new ExternalHistoryRequest(request)), _main(NULL), _windowId(request->getProperty("window_id")) {} CombinedHistoryRequest::~CombinedHistoryRequest() { delete _initial; delete _main; } void CombinedHistoryRequest::doWork(WorkerThread *resources) { if (_initial) { _main = new HistoryRequest(_initial, resources); delete _initial; _initial = NULL; if (_main->done()) { // We need to check for the done condition before we // start. Sometimes we get a request from a client // which requests a negative number of alerts. So we // are done before we start. _needToTerminate = true; } } // Don't try to execute the first time through. Let the main thread give us // more information first. else if (_main) { switch (userInfoGetInfo(getSocketInfo()).status) { case sSuspended: { // This shouldn't happen any more. We now lock the socket // between the beginning and the end of the login process. assert(false); break; } case sLimited: { // DEMO users get no history. The client has little if any // concept of DEMO, so we have to disallow it and display a // message. sendDemoMessage(); _needToTerminate = true; break; } case sFull: { _main->getHistory(_messageId, _windowId, resources); if (_main->done()) { _needToTerminate = true; } break; } default: { // No permissions. Not logged in. _needToTerminate = true; break; } } } if (_needToTerminate && _main) { sendTerminateMessage(); delete _main; _main = NULL; } if (_main) setDayNumber(_main->getCurrentDay()); } void CombinedHistoryRequest::sendDemoMessage() const { if (_messageId.present()) { XmlNode message; XmlNode &group = message["HISTORY"]; group.properties["WINDOW"] = getWindowId(); group.properties["TYPE"] = "alerts"; XmlNode &alert = group["ALERT"]; alert.properties["SYMBOL"] = "****"; alert.properties["DESCRIPTION"] = "History is only available to paying customers. History is not available in DEMO mode."; addToOutputQueue(getSocketInfo(), message.asString("API"), _messageId); } } void CombinedHistoryRequest::sendTerminateMessage() const { if (_messageId.present()) { XmlNode message; XmlNode &group = message["HISTORY"]; group.properties["WINDOW"] = getWindowId(); group.properties["TYPE"] = "finished"; addToOutputQueue(getSocketInfo(), message.asString("API"), _messageId); } } ///////////////////////////////////////////////////////////////////// // HistoryHandler::ListenerInfo ///////////////////////////////////////////////////////////////////// CombinedHistoryRequest *HistoryHandler::ListenerInfo::find (std::string windowId) { return getPropertyDefault(_all, windowId); } void HistoryHandler::ListenerInfo::add(std::string windowId, CombinedHistoryRequest *request) { _all[windowId] = request; } void HistoryHandler::ListenerInfo::remove(std::string windowId) { _all.erase(windowId); } ///////////////////////////////////////////////////////////////////// // CancelHistoryRequest ///////////////////////////////////////////////////////////////////// class CancelHistoryRequest : public Request { const std::string _windowId; public: CancelHistoryRequest(SocketInfo *socket, std::string windowId) : Request(socket), _windowId(windowId) { } std::string getWindowId() { return _windowId; } }; ///////////////////////////////////////////////////////////////////// // HistoryHandler ///////////////////////////////////////////////////////////////////// void HistoryHandler::addWork(CombinedHistoryRequest *request) { _workerCluster.addWork(request); } HistoryHandler::HistoryHandler() : ThreadClass("HistoryHandler"), _incoming("HistoryHandler"), _workerCluster("history_databases", "History", mtWorkFinished, &_incoming) { CommandDispatcher::getInstance()->listenForCommand("history_listen", &_incoming, mtOpenChannel); startThread(); } HistoryHandler::~HistoryHandler() { Request *r = new Request(NULL); r->callbackId = mtQuit; _incoming.newRequest(r); waitForThread(); } HistoryHandler *HistoryHandler::_instance; void HistoryHandler::initInstance() { assert(!_instance); _instance = new HistoryHandler(); } HistoryHandler &HistoryHandler::getInstance() { return *_instance; } void HistoryHandler::threadFunction() { // This uses the standard layout for a thread function. We take items // from the _incoming queue as quickly as possible. We honer the quick // ones, like closing a socket or canceling a request, immediately. // The actual history requests can take some time. So we throw them // into a WorkerCluster queue. We look at the WorkerCluster when the // _incoming queue is empty. Requests can take way to long to satisfy // all at once, so we break them up. We don't want this thread to // block for too long. So each time we have take a work request we do // a small // amount of that request. If the request is not finished, we update it // so next time it will start where it left off this time, then we add // it back to the end of the WorkerCluster queue, for a round-robin // scheduler. while (true) { while (Request *current = _incoming.getRequest()) { switch (current->callbackId) { case mtOpenChannel: { // When the client requests a specific piece of data, // that request comes in the same way that we get // requests for realtime data. It is answered in the // same way. The actual data from the request is sent // in a streaming fashion. The user sends an // open_channel message to tell us where to send the // data. There is no response to this request. But // the results from any and all history requests are // delivered on this channel. ExternalRequest *request = dynamic_cast(current); _listenerInfo[request->getSocketInfo()].messageId = request->getResponseMessageId(); break; } case mtCancelRequest: { CancelHistoryRequest *request = dynamic_cast(current); cancelRequestImpl(request->getSocketInfo(), request->getWindowId()); ThreadMonitor::find().increment("canceledRequest"); break; } case mtAddRequest: { CombinedHistoryRequest *request = dynamic_cast(current); cancelRequestImpl(request->getSocketInfo(), request->getWindowId()); ListenerInfo &listener = _listenerInfo[current->getSocketInfo()]; request->setMessageId(listener.messageId); listener.add(request->getWindowId(), request); addWork(request); ThreadMonitor::find().increment("newRequest"); current = NULL; break; } case mtWorkFinished: { CombinedHistoryRequest *request = dynamic_cast(current); bool aborted = _workerCluster.acknowledge(request); if (!aborted) { current = NULL; SocketInfo *socket = request->getSocketInfo(); if (request->needToTerminate()) { cancelRequestImpl(socket, request->getWindowId()); delete request; ThreadMonitor::find().increment("finishedRequest"); } else { addWork(request); } } break; } case DeleteSocketThread::callbackId: { SocketInfo *socket = current->getSocketInfo(); _listenerInfo.erase(socket); _workerCluster.remove(socket); break; } case mtQuit: { delete current; return; } } delete current; } _workerCluster.matchAll(); _incoming.waitForRequest(); } } void HistoryHandler::cancelRequestImpl(SocketInfo *socket, std::string windowId) { ListenerInfo &listener = _listenerInfo[socket]; if (CombinedHistoryRequest *pendingRequest = listener.find(windowId)) { // If the request is in the _pending queue, then we should be able to // find it in listener.find(). But we get a lot of unnecessary cancels, // so we check first before calling the next line. _workerCluster.remove(pendingRequest); } listener.remove(windowId); } void HistoryHandler::cancelRequest(SocketInfo *socket, std::string windowId) { CancelHistoryRequest *r = new CancelHistoryRequest(socket, windowId); r->callbackId = mtCancelRequest; _incoming.newRequest(r); } void HistoryHandler::addRequest(CombinedHistoryRequest *request) { request->callbackId = mtAddRequest; _incoming.newRequest(request); } bool HistoryHandler::createHistoryRequest(ExternalRequest *request) { if (ExternalHistoryRequest::belongsToUs(request)) { addRequest(new CombinedHistoryRequest(request)); return true; } else { return false; } }