#include "../generate_alerts/misc_framework/AccumulateInsert.h" #include "../shared/GlobalConfigFile.h" #include "../shared/SimpleLogFile.h" #include "../shared/CommandDispatcher.h" #include "../shared/ReplyToClient.h" #include "CopyRows.h" ///////////////////////////////////////////////////////////////////// // Stored Procedures // // This is what we're testing with on Drama as of 7/8/2012 ///////////////////////////////////////////////////////////////////// /* drop procedure if exists wait_and_read_top_list; delimiter // create procedure wait_and_read_top_list (in start_after bigint) READS SQL DATA SQL SECURITY INVOKER begin declare last_found bigint; declare timeout datetime default now() + interval 15 second; loop1: loop select max(id) into last_found from top_list; do sleep(1.004); if (last_found > start_after) then leave loop1; end if; if (now() >= timeout) then leave loop1; end if; end loop loop1; select * from top_list where id > start_after and id <= last_found order by id limit 1000; end // delimiter ; GRANT EXECUTE ON PROCEDURE wait_and_read_top_list TO 'web_client'@'%'; drop procedure if exists wait_and_read_alerts; delimiter // create procedure wait_and_read_alerts (in start_after bigint) READS SQL DATA SQL SECURITY INVOKER begin declare last_found bigint; declare timeout datetime default now() + interval 15 second; loop1: loop select max(id) into last_found from alerts; do sleep(1.400); if (last_found > start_after) then leave loop1; end if; if (now() >= timeout) then leave loop1; end if; end loop loop1; select * from alerts where id > start_after and id <= last_found order by id limit 1000; end // delimiter ; GRANT EXECUTE ON PROCEDURE wait_and_read_alerts TO 'web_client'@'%'; */ ///////////////////////////////////////////////////////////////////// // CopyRows ///////////////////////////////////////////////////////////////////// CopyRows::~CopyRows() { Request *request = new Request(NULL); request->callbackId = mtQuit; _incoming.newRequest(request); waitForThread(); if (_monitor != &_source) delete _monitor; } CopyRows::CopyRows(std::string const &tableName, std::string const &destinationName) : ThreadClass("CopyRows from " + tableName + " to " + destinationName), _incoming(getName()), _realDatabase(getConfigItem("real_database", "mydb")), _tableName(tableName), _destinationName(destinationName), _readScriptName(getConfigItem("real_database", "mydb") + '.' + "wait_and_read_" + _tableName), _source("@source", getName() + " source"), _destination(destinationName, getName() + " destination"), _monitor((getConfigItem("database_monitor")=="")? &_source: new DatabaseWithRetry("@monitor", getName() + " monitor")), _lastId(0), _nextLogReportTime(0), _nextDatabaseReportTime(0) { assert(tableName != ""); assert(destinationName != ""); assert(_realDatabase != ""); const std::string setIdCommand = _realDatabase + '.' +_tableName + '_' + destinationName + "_set_id"; CommandDispatcher *cd = CommandDispatcher::getInstance(); cd->listenForCommand(setIdCommand, &_incoming, mtSetLastId); TclList msg; msg< destinations = explode(",", getConfigItem("destination_list")); const std::vector< std::string > tables = explode(",", getConfigItem("table_list")); for (std::vector< std::string >::const_iterator destination = destinations.begin(); destination != destinations.end(); destination++) for (std::vector< std::string >::const_iterator table = tables.begin(); table != tables.end(); table++) new CopyRows(*table, *destination); } static const std::string s_check_for_messages = "check_for_messages"; static const std::string s_check_for_copy = "check_for_copy"; void CopyRows::threadFunction() { ThreadMonitor &tm = ThreadMonitor::find(); findLastId(); while (true) { tm.setState(s_check_for_messages); while (Request *current = _incoming.getRequest()) { switch (current->callbackId) { case mtSetLastId: { ExternalRequest *request = dynamic_cast(current); SocketInfo *socket = request->getSocketInfo(); const int64_t oldId = _lastId; const int64_t newId = strtolDefault(request->getProperty("id"), -1); if (newId < 0) { // Presumably this means that the user didn't type the // command correctly. 0 is a valid value, meaning start // fresh. addToOutputQueue(socket, "Invalid message id\n" "(old id =" + ntoa(oldId) + ')', request->getResponseMessageId()); } else { _lastId = newId; addToOutputQueue(socket, "old id=" + ntoa(oldId) + ", new id=" + ntoa(newId), request->getResponseMessageId()); TclList msg; msg<= _nextDatabaseReportTime) { // If it's been 51 second since the last time we've reported a timestamp // to the database, do it now. The monitoring process will complain when // we are 3 minute behind or more. The read script times out after we // have been waiting for 15 seconds, so we can't be more precise than // that. So we expect to report about once a minute in the slow times, // and a little more often when alerts are flowing quickly. _nextDatabaseReportTime = time(NULL) + 51; const std::string sql = "UPDATE monitor_alive SET last_update=NOW() WHERE NAME='" + mysqlEscapeString(_tableName) + " -> " + mysqlEscapeString(_destinationName) + "'"; _monitor->tryQueryUntilSuccess(sql); } } static const std::string s_NULL = "NULL"; static const std::string s_CALL = "CALL "; static const std::string s_REPLACE_INTO = "REPLACE INTO "; static const std::string s_row = "row"; static const std::string s_read_source = "read source"; static const std::string s_write_dest = "write dest"; void CopyRows::checkForCopy() { ThreadMonitor &tm = ThreadMonitor::find(); const std::string originalState = tm.getState(); checkForDatabaseReport(); tm.setState("Request data"); MysqlResultRef result = _source.tryQueryUntilSuccess(s_CALL + _readScriptName + '(' + ntoa(_lastId) + ')', s_read_source); tm.setState("Examine data"); if (!result->rowIsValid()) { // No data! Typically the database will wait until there is data. // But there might be a timeout, making it easy for us to check for // other events. Currently the timeout is 14-15 seconds, but we have // no control over that. The timeout is stored on the database in // the stored procedure. tm.increment("Read no data"); tm.setState(originalState); return; } if (time(NULL) >= _nextLogReportTime) { // If it's been 5 minutes since the last time we've reported a timestamp // to the log, do it now. _nextLogReportTime = time(NULL) + 5 * 60; TclList msg; msg<getStringField("id", s_NULL) <<"timestamp"<getStringField("timestamp", s_NULL); sendToLogFile(msg); } tm.setState("Build header"); std::string header = s_REPLACE_INTO + _realDatabase + '.' +_tableName + '('; bool first = true; std::vector< std:: string > columns; result->getColumnNames(columns); for (std::vector< std:: string >::const_iterator it = columns.begin(); it != columns.end(); it++) { if (first) first = false; else header += ','; header += '`'; header += *it; header += '`'; } header += ")"; AccumulateInsert accumulator(header); tm.setState("Build body"); std::string newLastId; for (; result->rowIsValid(); result-> nextRow()) { tm.increment(s_row); std::string row; for (std::vector< std::string >::size_type i = 0; i < columns.size(); i++) { if (row.empty()) row = '('; else row += ','; if (result->fieldIsEmpty(i)) row += s_NULL; else { row += '"'; row += mysqlEscapeString(result->getStringField(i)); row += '"'; } } row += ')'; accumulator.add(row); if (accumulator.full()) { tm.increment("Send full"); _destination.tryQueryUntilSuccess(accumulator.get(), s_write_dest); } newLastId = result->getStringField("id"); } if (!accumulator.empty()) { tm.increment("Send partial"); _destination.tryQueryUntilSuccess(accumulator.get(), s_write_dest); } _lastId = strtolDefault(newLastId, _lastId); tm.setState(originalState); } void CopyRows::findLastId() { _lastId = _destination.tryQueryUntilSuccess("SELECT MAX(id) FROM " + _realDatabase + '.' + _tableName) ->getIntegerField(0, 0); TclList msg; msg<