#include #include #include #include #include #include #include #include #include "../shared/ThreadSafeRefCount.h" #include "../shared/MarketHours.h" #include "../shared/ThreadClass.h" #include "../shared/Messages.h" #include "../shared/CommandDispatcher.h" #include "../shared/ReplyToClient.h" #include "../shared/SimpleLogFile.h" #include "../shared/DatabaseWithRetry.h" #include "../shared/XmlSupport.h" #include "../shared/TwoDLookup.h" #include "AlertConfig.h" #include "UserInfo.h" #include "../shared/GlobalConfigFile.h" #include "WorkerThread.h" #include "RTF.h" #include "NormalDistribution.h" #include "Types.h" #include "../shared/MiscSQL.h" #include "OddsMaker.h" // TO DO: Need to do a better job with the trailing stops. Currently we // use alerts, but at best those only work during the day. ///////////////////////////////////////////////////////////////////// // Global ///////////////////////////////////////////////////////////////////// static std::set< UserId > showDebugInfo; // The input s should be a valid rtf string. The output is the same string, // in red, green or the default color if d is negative, positive, or zero, // respectively. std::string colorByValue(rtf::ColorTable &colorTable, std::string s, double d) { if (d < 0) { return colorTable.foregroundColor(s, rtf::red); } else if (d > 0) { return colorTable.foregroundColor(s, rtf::green); } else { return s; } } static NormalDistribution *normalDistribution; enum { mtOpenChannel, mtRequestOdds, mtCancelOdds, mtMainFinished, mtDayFinished, mtWorkFinished, mtQuit }; static RequestQueue *destination; static std::string dateTimeToXL(time_t source) { if (source <= 0) return ""; tm brokenDownTime; localtime_r(&source, &brokenDownTime); char buffer[200]; const int count = strftime(buffer, 150, "%m/%d/%Y %H:%M:%S", &brokenDownTime); buffer[count] = 0; return buffer; } static void addTimeToTable(TwoDArray &response, std::string const &col, std::string const &row, time_t value) { if (value) { response.add(col, row, dateTimeToXL(localToNy(value))); static const std::string suffix = " (time_t)"; response.add(col + suffix, row, ntoa(value)); } } ///////////////////////////////////////////////////////////////////// // MeanAndStdDev // // This class accumulates data points and can provide the mean and // standard deviation at any time. ///////////////////////////////////////////////////////////////////// class MeanAndStdDev { private: double _sx; double _sxx; int _count; public: void clear(); MeanAndStdDev(); void add(double v); double getMean(); double getStdDev(); int getCount(); }; inline void MeanAndStdDev::clear() { _sx = 0; _sxx = 0; _count = 0; } inline MeanAndStdDev::MeanAndStdDev() { clear(); } inline void MeanAndStdDev::add(double v) { _sx += v; _sxx += v*v; _count++; } inline double MeanAndStdDev::getMean() { return _sx / _count; } inline double MeanAndStdDev::getStdDev() { // http://en.wikipedia.org/wiki/Standard_deviation // See the section titled "Rapid calculation methods". //return sqrt((_sxx - _sx * _sx / _count) / _count); double temp = (_sxx - _sx * _sx / _count) / _count; if (temp <= 0.0) { // If we see a negative number, assume that it was cased by roundoff // error. I have seen this case before, when there were 15 inputs, all // -0.01. The return value was -NAN. That caused problems in other // parts of the code, and we crashed. I'm not actually able to reproduce // that case, but looking at the core files that's the only thing that // makes sense. return 0.0; } else { return sqrt(temp); } } inline int MeanAndStdDev::getCount() { return _count; } ///////////////////////////////////////////////////////////////////// // PerDayStats // // This is the bottom part of the display, where we list a limited // summary of each day. ///////////////////////////////////////////////////////////////////// class PerDayStats { public: struct Day { std::string date; int count; double profit; Day() : count(0), profit(0) { } }; private: std::map< AlertId, Day > _byDay; public: void add(AlertId firstId, double profit) { Day &day = _byDay[firstId]; day.count++; day.profit += profit; } void initialize(AlertId firstId, std::string date) { // Convert from a complete mysql date to mm/dd std::string &toDisplay = _byDay[firstId].date; toDisplay = ""; if (date[5] != '0') { // No leading 0 on the month. toDisplay += date[5]; // Month } toDisplay += date[6]; // Month toDisplay += '/'; toDisplay += date[8]; // Day toDisplay += date[9]; // Day } typedef std::map< AlertId, Day >::const_iterator Iterator; Iterator begin() { return _byDay.begin(); } Iterator end() { return _byDay.end(); } std::string debugReport() { std::string result; for (std::map< AlertId, Day >::const_iterator it = _byDay.begin(); it != _byDay.end(); it++) { if (!result.empty()) { result += ", "; } // This should really use formatResult() rather than priceToString. result += priceToString(it->second.profit) + '/' + ntoa(it->second.count); } return result; } }; ///////////////////////////////////////////////////////////////////// // TradeDetail // // This contains one line of the details that we might send to a // user. This represents a single trade. ///////////////////////////////////////////////////////////////////// class TradeDetail { public: enum ReasonForExit { None, Timeout, ProfitTarget, StopLoss, ExitAlert }; typedef std::vector< TradeDetail > List; private: std::string _symbol; time_t _entryTime; double _entryPrice; double _timeoutPrice; time_t _timeoutTime; double _exitAlertPrice; time_t _exitAlertTime; double _profitTargetPrice; time_t _profitTargetTime; double _stopLossPrice; time_t _stopLossTime; bool _timeWarning; ReasonForExit _reasonForExit; double _exitPrice; bool _winner; double _moved; PropertyList _columnInfo; std::string _exchange; std::string _entryAlert; void addTo(TwoDArray &response, std::string const &id) const; public: void clear(); TradeDetail() { clear(); } void setSymbol(std::string const &symbol) { _symbol = symbol; } void setEntryTime(time_t time) { _entryTime = time; } void setEntryTime(std::string time) { setEntryTime(mysqlToTimeT(time)); } void setEntryPrice(double price) { _entryPrice = price; } void setTimeout(double price, time_t time) { _timeoutPrice = price; _timeoutTime = time; } void setTimeout(double price, std::string time) { setTimeout(price, mysqlToTimeT(time)); } void setExitAlert(double price, time_t time) { _exitAlertPrice = price; _exitAlertTime = time; } void setProfitTargetPrice(double price) { _profitTargetPrice = price; } void setProfitTargetTime(time_t time) { _profitTargetTime = time; } void setStopLossPrice(double price) { _stopLossPrice = price; } void setStopLossTime(time_t time) { _stopLossTime = time; } void setTimeWarning() { _timeWarning = true; } void setExit(ReasonForExit reason, double price) { _reasonForExit = reason; _exitPrice = price; } void setWinner(bool winner) { _winner = winner; } void setMoved(double moved) { _moved = moved; } PropertyList &getColumnInfo() { return _columnInfo; } PropertyList const &getColumnInfo() const { return _columnInfo; } void setExchange(std::string const &exchange) { _exchange = exchange; } void setEntryAlert(std::string const &alert) { _entryAlert = alert;} TclList debugDump() const; void addTo(TwoDArray &response) const; void addTo(TwoDArray &response, int id) const; }; void TradeDetail::addTo(TwoDArray &response) const { const std::string id = ntoa(response.getRowHeaders().size() + 1); addTo(response, id); } void TradeDetail::addTo(TwoDArray &response, int id) const { addTo(response, ntoa(id)); } void TradeDetail::addTo(TwoDArray &response, std::string const &id) const { response.add("Symbol", id, _symbol); addTimeToTable(response, "Entry Time", id, _entryTime); if (_entryPrice) response.add("Entry Price", id, priceToString(_entryPrice)); if (_timeoutPrice) response.add("Timeout Price", id, priceToString(_timeoutPrice)); addTimeToTable(response, "Timeout Time", id, _timeoutTime); if (_exitAlertPrice) response.add("Exit Alert Price", id, priceToString(_exitAlertPrice)); addTimeToTable(response, "Exit Alert Time", id, _exitAlertTime); if (_profitTargetPrice) response.add("Profit Target Price", id, priceToString(_profitTargetPrice)); addTimeToTable(response, "Profit Target Time", id, _profitTargetTime); if (_stopLossPrice) response.add("Stop Loss Price", id, priceToString(_stopLossPrice)); addTimeToTable(response, "Stop Loss Time", id, _stopLossTime); if (_timeWarning) response.add("Time Warning", id, "*"); switch (_reasonForExit) { case Timeout: response.add("Reason For Exit", id, "Timeout"); break; case ProfitTarget: response.add("Reason For Exit", id, "Profit Target"); break; case StopLoss: response.add("Reason For Exit", id, "Stop Loss"); break; case ExitAlert: response.add("Reason For Exit", id, "Exit Alert"); break; case None: break; } if (_exitPrice) response.add("Exit Price", id, priceToString(_exitPrice)); response.add("Result", id, _winner?"Winner":"Loser"); response.add("Moved", id, priceToString(_moved)); if (!_exchange.empty()) response.add("Exchange", id, _exchange); if (!_entryAlert.empty()) response.add("Entry Alert", id, _entryAlert); for (PropertyList::const_iterator it = _columnInfo.begin(); it != _columnInfo.end(); it++) response.add(it->first, id, it->second); } TclList TradeDetail::debugDump() const { TclList result; if (!_symbol.empty()) result<<"_symbol"<<_symbol; if (_entryTime) result<<"_entryTime"<first<second; } result<<"_columnInfo"< Ref; }; StreamingCsv::StreamingCsv() : _count(0) { int error = pthread_mutex_init(&_mutex, NULL); assert(!error); } StreamingCsv::~StreamingCsv() { int error = pthread_mutex_destroy(&_mutex); assert(!error); } std::string StreamingCsv::getHeader() { pthread_mutex_lock(&_mutex); const std::string result = _array.writeToCSV(); pthread_mutex_unlock(&_mutex); return result; } std::string StreamingCsv::getRow(TradeDetail const &tradeDetail) { pthread_mutex_lock(&_mutex); _count++; tradeDetail.addTo(_array, _count); const std::string result = _array.writeToCSV(false); _array.eraseAllRows(); pthread_mutex_unlock(&_mutex); return result; } ///////////////////////////////////////////////////////////////////// // OddsMakerCancel // // We have a lot of request objects all working together to satisfy one user // request. These objects might be in different states, like in a queue, in // a local variable, in another thread. Because of the different threads, some // objects might be deleted while others are not. We need a good way to cancel // all of the request objects at once. OddsMakerCancel is a wrapper around // a thread safe pointer. That makes it easy to cancel objects without knowing // each object's state. // // Note that this does not use a volatile pointer. That's not really required. // We only read and set this variable in one thread. We need the thread safe // smart pointer because any thread could delete this smart pointer, and any // object could be the last one holding the smart pointer. // // There is a lot of overlap between the idea of "aborting" a request in the // WorkerCluster and "canceling" a request using an OddsMakerCancel. Aborting // is an older idea that didn't quite do what we needed. It mostly worked, // but the OddsMaker required more. Note that when aborting failed we got // core dumps, which is why we created the OddsMakerCanel class. // // Here's more design info, including some recommendations for next time: // o I have a new plan for the OM core dumps. // o It's not entirely new, but I think it's inevitable now. // o You have a special state that's shared by all of the small requests. // o At a bare minimum it's a Boolean that says the original request has or // has not been abandoned. // o Maybe other data is in there. I don't want to make big changes now, but // ideally most of the master request would be in there. // o Every request contains a thread safe smart pointer to that object. // o Only the master/dispatcher thread can read or write that data, but any // thread might release a smart pointer to the data. // o There's also a smart pointer to that data in the ListenerInfo object, // which we were recently modifying (in revision 1.108 of OddsMaker.C). // o To clean up the helper objects, just set that Boolean to true. // o To clean up the main object, keep what we did today (revision 1.108) // (unless we want to rewrite a lot) but also check the new Boolean. // o So if the main object is idle, we delete it directly, but if it's out at // a helper thread, the new Boolean will eventually get it. ///////////////////////////////////////////////////////////////////// class OddsMakerCancel { private: NCTSRefCount< bool > _canceled; public: OddsMakerCancel() : _canceled(new bool(false)) { } void cancel() const { *_canceled = true; } bool isCanceled() const { return *_canceled; } }; ///////////////////////////////////////////////////////////////////// // OddsMakerInstructions // // This class describes the original request from the user, after // the one time initialization has been performed. Presumably each // worker will get a copy of this object because it will be running // in its own thread. Presumably each copy will be read only. Only // the initialization part of the code will need to modify this. // Each worker will have the same instructions. ///////////////////////////////////////////////////////////////////// class OddsMakerInstructions { private: // Abort a request in progress. OddsMakerCancel _cancel; // Housekeeping. Who made this request? Who receives the reply? SocketInfo * const _socket; const std::string _windowId; const ExternalRequest::MessageId _messageId; const bool _xmlMode; // Original request from the client. bool _successDirectionUp; bool _successTypePercent; double _successMinMove; int _entryTimeStart; int _entryTimeEnd; enum { ttTrade, // X minutes after the trade was entered. ttTodaysClose, // X minutes before the close. ttFutureOpen, // At the open of a future trading day. ttFutureClose // At the close of a future trading day. } _timeoutType; int _timeoutMinutes; // Minutes after the entry, or before the close. double _profitTarget; double _stopLoss; bool _stopLossWiggle; enum { ectNone, ectAuto, ectManual } _exitConditionType; enum { wDefault, wScottrade } _wording; bool _showDebugInfo; bool _returnCsvFile; bool _saveToAlternateServer; StreamingCsv::Ref _streamingCsv; int _daysOfTest; // Days where we look for an entry. int _daysToSkip; // We don't want to examine the most recent days. int _daysToHold; // Add this many days to the entry to find the exit. std::string _location; // Initialize these on our first timeslice. AlertConfig::CustomSql _entryGenerator; AlertConfig::CustomSql _exitGenerator; PropertyList _csvNames; ShardList _shardList; public: OddsMakerInstructions(ExternalRequest *externalRequest, ExternalRequest::MessageId messageId); bool isCanceled() const { return _cancel.isCanceled(); } OddsMakerCancel const &getCancelContainer() const { return _cancel; } std::string const &getWindowId() const { return _windowId; } ExternalRequest::MessageId getMessageId() const { return _messageId; } bool getXmlMode() const { return _xmlMode; } bool getSuccessDirectionUp() const { return _successDirectionUp; } bool getStopLossWiggle() const { return _stopLossWiggle; } bool exitConditionNone() const { return _exitConditionType == ectNone; } bool exitConditionManual() const { return _exitConditionType == ectManual; } bool exitConditionAuto() const { return _exitConditionType == ectAuto; } bool timeoutTypeTrade() const { return _timeoutType == ttTrade; } bool timeoutTypeTodaysClose() const { return _timeoutType == ttTodaysClose; } bool timeoutTypeFutureOpen() const { return _timeoutType == ttFutureOpen; } bool timeoutTypeFutureClose() const { return _timeoutType == ttFutureClose; } double getStopLoss() const { return _stopLoss; } double getProfitTarget() const { return _profitTarget; } int getEntryTimeStart() const { return _entryTimeStart; } int getEntryTimeEnd() const { return _entryTimeEnd; } bool getSuccessTypePercent() const { return _successTypePercent; } double getSuccessMinMove() const { return _successMinMove; } int getTimeoutMinutes() const { return _timeoutMinutes; } AlertConfig::CustomSql &getEntryGenerator() { return _entryGenerator; } AlertConfig::CustomSql &getExitGenerator() { return _exitGenerator; } AlertConfig::CustomSql const &getEntryGenerator() const { return _entryGenerator; } AlertConfig::CustomSql const &getExitGenerator() const { return _exitGenerator; } PropertyList &getCsvNames() { return _csvNames; } PropertyList const &getCsvNames() const { return _csvNames; } ShardList &getShardList() { return _shardList; } ShardList const &getShardList() const { return _shardList; } bool getScottradeWording() const { return _wording == wScottrade; } SocketInfo *getSocket() const { return _socket; } int getDaysOfTest() const { return _daysOfTest; } int getDaysToSkip() const { return _daysToSkip; } std::string const &getLocation() { return _location; } int getDaysToHold() const { return _daysToHold; } std::string intradayEndTime(std::string startTime) const; double direction() const { return _successDirectionUp?1.0:-1.0; } bool betterThan(double a, double b) const // Is a a better selling price? { return _successDirectionUp?(a > b):(a < b); } std::string formatResult(double result) const; void sendDebugMessage(std::string message) const; bool getReturnCsvFile() const { return _returnCsvFile; } StreamingCsv::Ref const &getStreamingCsv() const { return _streamingCsv; } int maxAlertsPerDay() const { return _saveToAlternateServer?1000:100; } bool saveToAlternateServer() const { return _saveToAlternateServer; } }; OddsMakerInstructions::OddsMakerInstructions(ExternalRequest *externalRequest, ExternalRequest::MessageId messageId) : _socket(externalRequest->getSocketInfo()), _windowId(externalRequest->getProperty("window_id")), _messageId(messageId), _xmlMode(externalRequest->getProperty("xml_mode") == "1"), _daysToHold(0) { std::string property; TclList invalid; property = externalRequest->getProperty("success_direction"); if (property == "+") { _successDirectionUp = true; } else if (property == "-") { _successDirectionUp = false; } else { invalid<<"success_direction"<getProperty("success_type"); if (property == "%") { _successTypePercent = true; } else if (property == "$") { _successTypePercent = false; } else { invalid<<"success_type"<getProperty("success_min_move"); _successMinMove = strtodDefault(property, std::numeric_limits< double >::infinity()); if (!finite(_successMinMove)) { _successMinMove = _successTypePercent?1:0.01; invalid<<"success_min_move"<getProperty("entry_time_start"); _entryTimeStart = strtolDefault(property, -1); if (_entryTimeStart < 0) { _entryTimeStart = 0; invalid<<"entry_time_start"<getProperty("entry_time_end"); _entryTimeEnd = strtolDefault(property, -1); if (_entryTimeEnd < 0) { _entryTimeEnd = 0; invalid<<"entry_time_end"< totalMinutesPerDay) { invalid<<"entry_time_start + entry_time_end" <<_entryTimeStart + _entryTimeEnd; _entryTimeStart = 0; _entryTimeEnd = 0; } property = externalRequest->getProperty("timeout_type"); if (property == "close") { // Exit on todays close. _timeoutType = ttTodaysClose; property = externalRequest->getProperty("before_close_minutes"); _timeoutMinutes = strtolDefault(property, 0); if (_timeoutMinutes < 0) { // We can't extend past the close. _timeoutMinutes = 0; invalid<<"before_close_minutes"< totalMinutesPerDay) { // We can't close the trades before we were allowed to open // them. The next case would have caught this, too, but // it would have correted it in a different way. _timeoutMinutes = 0; invalid<<"before_close_minutes"< _entryTimeEnd) { // We should stop requesting new positions if it's already time // to sell off our current positions. _entryTimeEnd = _timeoutMinutes; invalid<<"before_close_minutes"<getProperty("at_open_days"); _daysToHold = strtolDefault(property, -1); if (_daysToHold < 0) { _daysToHold = 0; invalid<<"at_open_days"<getProperty("at_close_days"); _daysToHold = strtolDefault(property, -1); if (_daysToHold < 1) { _daysToHold = 1; invalid<<"at_close_days"<getProperty("timeout_minutes"); _timeoutMinutes = strtolDefault(property, 0); if (_timeoutMinutes < 5) { _timeoutMinutes = 5; invalid<<"timeout_minutes"<getProperty("profit_target"); if (property.empty()) { _profitTarget = 0; } else { _profitTarget = strtodDefault(property, -1); if (_profitTarget < 0) { invalid<<"profit_target"<getProperty("stop_loss_wiggle"); if (property == "1") { _stopLossWiggle = true; } else if (property.empty()) { _stopLossWiggle = false; } else { invalid<<"stop_loss_wiggle"<getProperty("stop_loss"); if (property.empty()) { _stopLoss = 0; } else { _stopLoss = strtodDefault(property, -1); if (_stopLoss < 0) { invalid<<"stop_loss"<getProperty("exit_condition_type"); if (property == "none") { // Exit only because of time, not because of another alert. _exitConditionType = ectNone; } else if (property == "auto") { // Exit because of a traling stops. _exitConditionType = ectAuto; } else if (property == "manual") { // Exit because of a user configured alert. _exitConditionType = ectManual; } else { _exitConditionType = ectNone; invalid<<"exit_condition_type"<getProperty("wording"); if (property.empty()) { _wording = wDefault; } else if (property == "Scottrade") { _wording = wScottrade; } else { invalid<<"wording"<getProperty("show_debug_info"); if ((property == "") || (property == "0")) { _showDebugInfo = false; } else if (property == "1") { _showDebugInfo = true; } else { _showDebugInfo = false; invalid<<"show_debug_info"<getProperty("return_csv_file"); if ((property == "") || (property == "0")) _returnCsvFile = false; else if (property == "1") _returnCsvFile = true; else if (property == "streaming") { _returnCsvFile = true; _streamingCsv = new StreamingCsv(); } else { _returnCsvFile = false; invalid<<"return_csv_file"<getProperty("lots_of_results"); if ((property == "") || (property == "0")) _saveToAlternateServer = false; else if (property == "1") { if (getConfigItem("om_lots_of_results") == "1") _saveToAlternateServer = true; else { _saveToAlternateServer = false; invalid<<"lots_of_results"<<"Not valid on this server"; } } else { _saveToAlternateServer = false; invalid<<"lots_of_results"<getProperty("days_of_test"); _daysOfTest = strtolDefault(property, -1); if (_daysOfTest < 0) { _daysOfTest = 15; if (!property.empty()) { invalid<<"days_of_test"<getProperty("days_to_skip"); _daysToSkip = strtolDefault(property, -1); if (_daysToSkip < 0) { _daysToSkip = 0; if (!property.empty()) { invalid<<"days_to_skip"<getProperty("location"); if (property.empty()) { _location = "US"; } else { _location = property; } if (!((std::string)invalid).empty()) { sendDebugMessage((std::string)"Invalid inputs: " + (std::string)invalid); } } std::string OddsMakerInstructions::intradayEndTime(std::string startTime) const { // Look for the closing price at this time. int result; if (_timeoutType == ttTrade) { // Go forward the specified number of minutes after the trade. result = mysqlToSeconds(startTime) + _timeoutMinutes * 60; // Round off to the nearest 5 minute boundary. result = ((result + 150) / 300) * 300; // Don't go past the end of the day. result = std::min(result, MarketHours::close()); } else { // assert(_timeoutType == ttClose) result = MarketHours::close() - _timeoutMinutes * 60; } return secondsToMysql(result, startTime); } std::string OddsMakerInstructions::formatResult(double result) const { if (_successTypePercent) { return percentToString(result) + "%"; } else { return "$" + priceToString(result); } } void OddsMakerInstructions::sendDebugMessage(std::string message) const { if (_showDebugInfo) { XmlNode node; XmlNode &body = node["ODDSMAKER"]; body.properties["TYPE"] = "debug"; body.properties["WINDOW"] = _windowId; body.text = message; addToOutputQueue(_socket, node.asString("API"), _messageId); } } ///////////////////////////////////////////////////////////////////// // OddsMakerMainRequest ///////////////////////////////////////////////////////////////////// class OddsMakerDayRequest; class OddsMakerMainRequest : public WorkerThreadRequest { private: // These are the instructions which are shared with, and the same for, all // of the workers. OddsMakerInstructions _instructions; // Original request from the client. We store things here in the // constructor. These are used to finish initializing _instructions the // first time we can access the database. After that, use // _instructions.getEntryGenerator() and getExitGenerator() rather than // these. std::string _entryCondition; std::string _exitCondition; // Current state, so we know where to start the next time we get a timeslice. enum { csDoFirstInit, csReadyToCreateChildren, csWaitingForChilden, csFinalReport, csDone } _currentState; // Initialize this on our first timeslice. UserInfoExport _userInfo; std::vector< std::string > _dates; std::set< OddsMakerDayRequest * > _outstandingChildren; double _totalChildren; long _logRowId; // Results int _winnerCount; typedef std::multiset< double > AllResults; AllResults _allResults; PerDayStats _perDayStats; double _progress; // 0 - 100; typedef std::map< std::string, TradeDetail::List > AllTrades; AllTrades _allTrades; TimeVal _nextStatusTime; std::string percentSuccess(rtf::ColorTable &colorTable) const; void percentSuccess(XmlNode &node) const; void initializeRequest(WorkerThread *resources); void finalReport(WorkerThread *resources); double getResultByPercentile(double percentile); void sendProgressMessage(std::string message); void sendProgressMessage(XmlNode &message); // message might be destroyed. void sendCsvList(std::string message) const; void sendCsvListNormal(std::string message) const; void sendCsvListAlternate(std::string message) const; public: OddsMakerCancel const &getCancelContainer() const { return _instructions.getCancelContainer(); } bool isCanceled() const { return _instructions.getCancelContainer().isCanceled(); } RequestListener *getReturnListener() { return destination; } int getReturnMessageId() { return mtMainFinished; } void doWork(WorkerThread *resources); bool done() const { return _currentState == csDone; } bool readyToCreate() const { return _currentState==csReadyToCreateChildren; } bool waitingForChildren() const { return !_outstandingChildren.empty(); } void createChildren(WorkerCluster &workerCluster); OddsMakerMainRequest(ExternalRequest *externalRequest, ExternalRequest::MessageId messageId); const std::string &getWindowId(); void addTrade(AlertId firstIdOfTheDay, double move); void addWinners(int newWinners) { _winnerCount += newWinners; } void reportStatus(double progress); void describeDay(AlertId minId, std::string const &description); void takeTradeList(std::string const &date, TradeDetail::List &list); void childIsDone(OddsMakerDayRequest *child); void noChildren(); }; void OddsMakerMainRequest::takeTradeList(std::string const &date, TradeDetail::List &list) { if (!list.empty()) _allTrades[date].swap(list); } void OddsMakerMainRequest::describeDay(AlertId minId, std::string const &description) { _perDayStats.initialize(minId, description); } void OddsMakerMainRequest::childIsDone(OddsMakerDayRequest *child) { const bool somethingWasThere = _outstandingChildren.erase(child); assert(somethingWasThere); if (!waitingForChildren()) { _currentState = csFinalReport; } } void OddsMakerMainRequest::noChildren() { assert(!waitingForChildren()); _currentState = csFinalReport; } void OddsMakerMainRequest::reportStatus(double progress) { _progress += progress / _totalChildren; TimeVal now(true); if (now > _nextStatusTime) { _nextStatusTime = now.addMilliseconds(100); std::string percentString; if ((_progress > 0) && (_progress < 100)) { percentString = dtoaFixed(_progress, 2); } if (_instructions.getXmlMode()) { XmlNode message; XmlNode &status = message["WORKING"]; percentSuccess(status); if (!percentString.empty()) { status.properties["PERCENT_COMPLETE"] = percentString; } sendProgressMessage(message); } else { rtf::ColorTable colorTable; std::string progressMessage = percentSuccess(colorTable); progressMessage += " (still running"; if (!percentString.empty()) { progressMessage += ", " + percentString + "% complete"; } progressMessage += ')'; sendProgressMessage(colorTable + progressMessage); } } } void OddsMakerMainRequest::addTrade(AlertId firstIdOfTheDay, double move) { _allResults.insert(move); _perDayStats.add(firstIdOfTheDay, move); } void OddsMakerMainRequest::sendCsvList(std::string message) const { if (_instructions.saveToAlternateServer()) sendCsvListAlternate(message); else sendCsvListNormal(message); } void OddsMakerMainRequest::sendCsvListNormal(std::string message) const { XmlNode node; XmlNode &body = node["ODDSMAKER"]; body.properties["TYPE"] = "csv_list"; body.properties["WINDOW"] = _instructions.getWindowId(); body.text = message; addToOutputQueue(getSocketInfo(), node.asString("API"), _instructions.getMessageId()); } // I don't think this is a great solution. I hope it's temporary. One idea is // to send the rows one at a time, which will avoid big messages. That could // still cause a lot of traffic, if the OM is finding results quickly. void OddsMakerMainRequest::sendCsvListAlternate(std::string message) const { XmlNode node; XmlNode &body = node["ODDSMAKER"]; body.properties["TYPE"] = "csv_location"; body.properties["WINDOW"] = _instructions.getWindowId(); bool reportError = false; uuid_t uuid_packed; uuid_generate(uuid_packed); char uuid_asChars[37]; uuid_unparse(uuid_packed, uuid_asChars); const std::string baseName = std::string(uuid_asChars) + ".csv"; const std::string localFileName = "/tmp/" + baseName; const std::string remoteFileName = "bob-saget:/var/www/alldocs/main/OddsMakerResults/" + baseName; const std::string clientName = "http://www.trade-ideas.com/OddsMakerResults/" + baseName; body.text = clientName; { TclList msg; msg<getMasterDatabase(); // See if the user has permission to use this feature. if (_userInfo.status == sNone) { if (_instructions.getXmlMode()) { XmlNode message; XmlNode &error = message["ERROR"]; error.properties["CODE"] = "log in"; error.properties["MESSAGE"] = "Please log in to use the OddsMaker."; sendProgressMessage(message); } else { sendProgressMessage("Please log in to use the OddsMaker."); } _currentState = csDone; return; } else if (_userInfo.status == sLimited) { if (_instructions.getXmlMode()) { XmlNode message; XmlNode &error = message["ERROR"]; error.properties["CODE"] = "demo"; error.properties["MESSAGE"] = "The OddsMaker is not available in DEMO mode."; sendProgressMessage(message); } else { sendProgressMessage("The OddsMaker is not available in DEMO mode."); } _currentState = csDone; return; } else if (masterDatabase.tryQueryUntilSuccess ("SELECT oddsmaker_free - oddsmaker_total AS diff FROM users WHERE id=" + ntoa(_userInfo.userId))->getIntegerField(0, 1) <= 0) { // If not then display an error message and exit. // Note: A NULL value means an infinite allowance. //rtf::ColorTable colorTable; if (_instructions.getXmlMode()) { XmlNode message; XmlNode &error = message["ERROR"]; error.properties["CODE"] = "trials"; error.properties["MESSAGE"] = "You are out of free trials. " "Sign up for the OddsMaker to continue."; sendProgressMessage(message); } else { sendProgressMessage("You are out of free trials. " "Sign up for the OddsMaker to continue."); } _currentState = csDone; return; } DatabaseWithRetry &historicalDatabase = *resources->getReadOnlyDatabase(); //0.0); AlertConfig entryCondition; const bool requestColumnData = _instructions.getReturnCsvFile(); entryCondition.load(_entryCondition, _userInfo.userId, historicalDatabase, requestColumnData, requestColumnData); AlertConfig exitCondition; if (!_instructions.exitConditionNone()) exitCondition.load(_exitCondition, _userInfo.userId, historicalDatabase, false, false); // These were temporary variables. Inside of this method you can use // entryCondition and exitCondition. Later, use the generators, instead of // these. _entryCondition.clear(); _exitCondition.clear(); std::vector< std::string > sql; sql.push_back ("INSERT INTO oddsmaker_use" "(id,timestamp,user_id,settings,direction,exit_condition) " "VALUES(NULL,NOW()," + ntoa(_userInfo.userId) + ",'" + mysqlEscapeString(entryCondition.save()) + "','" + (_instructions.getSuccessDirectionUp()?"up":"down") + "'," + (_instructions.exitConditionNone()?"NULL": ("'" + exitCondition.save() + "'")) + ")"); sql.push_back("SELECT LAST_INSERT_ID()"); _logRowId = masterDatabase.tryAllUntilSuccess(sql.begin(), sql.end())[1]->getIntegerField(0, 0); // Do the one time processing on the alert configurations. entryCondition.saveToMru(_userInfo.userId, masterDatabase); entryCondition.removeIllegalData(_userInfo.userId, historicalDatabase); entryCondition .getCsvNames(_instructions.getCsvNames(), PairedFilterList(_userInfo.userId, historicalDatabase, false, false)); static std::string basicEntryFields = "alerts.symbol,price,id,timestamp,list_exch,alert_type"; static std::string wiggleEntryFields = basicEntryFields + ",relvol*volatility AS wiggle"; std::string where = _instructions.getStopLossWiggle()?wiggleEntryFields:basicEntryFields; entryCondition.customSql(_userInfo.userId, historicalDatabase, where, true, _instructions.getEntryGenerator()); _instructions.getEntryGenerator().setLimit(1); //_entryGenerator.setAlertIndex(_entryGenerator.aiUsePrimary); if (!_instructions.exitConditionNone()) { if (_instructions.exitConditionManual()) { exitCondition.saveToMru(_userInfo.userId, masterDatabase); } exitCondition.removeIllegalData(_userInfo.userId, historicalDatabase); static const std::string exitFields = "price, id, timestamp, unix_timestamp(timestamp) as UT"; exitCondition.customSql(_userInfo.userId, historicalDatabase, exitFields, true, _instructions.getExitGenerator()); _instructions.getExitGenerator().setLimit(1); } _instructions.getShardList().load(historicalDatabase); const std::string dateQuery = "SELECT day FROM holidays, candles_d WHERE day=date AND NOT find_in_set('" + mysqlEscapeString(_instructions.getLocation()) + "',closed) GROUP BY day ORDER BY day DESC LIMIT " + ntoa(_instructions.getDaysToSkip()) + "," + ntoa(_instructions.getDaysOfTest() + _instructions.getDaysToHold()); MysqlResultRef result = historicalDatabase.tryQueryUntilSuccess(dateQuery); _dates.reserve(result->numRows()); for (; result->rowIsValid(); result->nextRow()) { _dates.push_back(result->getStringField(0)); } std::reverse(_dates.begin(), _dates.end()); // This next test doesn't seem to do much. Maybe if I set skip to something // big I can see csFinalReport. (I didn't try that.) But when I set the // number of hold days to something big, this creates a lot days and sets // _currentState to csReadyToCreateChildren. But createChildren() returns // an empty list. _currentState = _dates.empty()?csFinalReport:csReadyToCreateChildren; //std::cout<<"_dates.size() = "<<_dates.size()<::const_iterator it = _dates.begin(); // it != _dates.end(); // it++) // std::cout<<" "<<*it<= (int)_allResults.size())) { // Should this be an assertion? return 0; } AllResults::const_iterator it = _allResults.begin(); std::advance(it, lower); double result = *it; if (lower != upper) { it++; result = (result + *it) / 2; } return result; } void OddsMakerMainRequest::finalReport(WorkerThread *resources) { DatabaseWithRetry &masterDatabase = *resources->getMasterDatabase(); masterDatabase.tryQueryUntilSuccess ("UPDATE oddsmaker_use SET end_time = NOW() WHERE id=" + ntoa(_logRowId)); // Manage free trials. std::string userId = ntoa(_userInfo.userId); masterDatabase.tryQueryUntilSuccess ("UPDATE users SET oddsmaker_total = oddsmaker_total + 1 WHERE id=" + userId); MysqlResultRef result = masterDatabase.tryQueryUntilSuccess ("SELECT oddsmaker_free - oddsmaker_total AS diff FROM users WHERE id=" + userId); bool paidFor = result->fieldIsEmpty(0); OddsMakerTrials trialsRemaining = result->getIntegerField(0, 0); rtf::ColorTable colorTable; std::string progressMessage; XmlNode message; XmlNode &summary = message["SUMMARY"]; if (_instructions.getXmlMode()) { percentSuccess(summary); } else { progressMessage = percentSuccess(colorTable); } if (!_allResults.empty()) { if (!_instructions.getXmlMode()) { progressMessage += (_instructions.getSuccessDirectionUp()?" up ":" down ") + rtf::bold(_instructions.formatResult(_instructions.getSuccessMinMove())); if (_instructions.timeoutTypeTrade()) { progressMessage += " in " + ntoa(_instructions.getTimeoutMinutes()) + " minutes; "; } else if (_instructions.timeoutTypeTodaysClose()) { if (_instructions.getTimeoutMinutes()) { progressMessage += " " + ntoa(_instructions.getTimeoutMinutes()) + " minutes before the close; "; } else { progressMessage += " at the close; "; } } else if (_instructions.timeoutTypeFutureOpen()) { switch (_instructions.getDaysToHold()) { case 1: progressMessage += " at next open; "; break; case 2: progressMessage += " at open after 1 day; "; break; default: progressMessage += " at open after "; progressMessage += ntoa(_instructions.getDaysToHold()-1); progressMessage += " days; "; break; } } else if (_instructions.timeoutTypeFutureClose()) { switch (_instructions.getDaysToHold()) { case 1: progressMessage += " at close after 1 day; "; break; default: progressMessage += " at close after "; progressMessage += ntoa(_instructions.getDaysToHold()); progressMessage += " days; "; break; } } // else assert false. } int winnerCount = 0; double winnerTotal = 0; int loserCount = 0; double loserTotal = 0; for (AllResults::const_iterator it = _allResults.begin(); it != _allResults.end(); it++) { double value = *it; if (value > 0) { winnerCount++; winnerTotal += value; } else if (value < 0) { loserCount++; loserTotal += value; } } if (_instructions.getXmlMode()) { summary.properties["WINNER_TOTAL"] = ntoa(winnerTotal); summary.properties["WINNER_COUNT"] = ntoa(winnerCount); summary.properties["LOSER_TOTAL"] = ntoa(loserTotal); summary.properties["LOSER_COUNT"] = ntoa(loserCount); summary.properties["BEST"] = ntoa(*_allResults.rbegin()); summary.properties["WORST"] = ntoa(*_allResults.begin()); } else { if (winnerCount) { progressMessage += "Average winner = " + rtf::bold(_instructions.formatResult(winnerTotal / winnerCount)) + ", "; } if (loserCount) { progressMessage += "Average loser = " + rtf::bold(_instructions.formatResult(loserTotal / loserCount)) + ", "; } double netWinnings = winnerTotal + loserTotal; progressMessage += "Net winnings = " + colorByValue(colorTable, rtf::bold(_instructions.formatResult(netWinnings)), netWinnings) + ", Best = " + rtf::bold(_instructions.formatResult(*_allResults.rbegin())) + ", Worst = " + rtf::bold(_instructions.formatResult(*_allResults.begin())); } if (_allResults.size() >= 7) { MeanAndStdDev stats; for (AllResults::const_iterator it = _allResults.begin(); it != _allResults.end(); it++) { stats.add(*it); } double mean = stats.getMean(); double casinoFactor; if (double stdDev = stats.getStdDev()) { double sumStdDev = stdDev / sqrt(stats.getCount()); double zScore = (mean - _instructions.getSuccessMinMove()) / sumStdDev; casinoFactor = normalDistribution->probability(zScore) * 100; //if (_showDebugInfo) //{ // sendDebugMessage("stdDev=" // + dtoa(stdDev, 4) // + ", sumStdDev=" // + dtoa(sumStdDev, 4) // + ", mean=" // + dtoa(mean, 4) // + ", zScore=" // + dtoa(zScore, 4) // + ", casinoFactor=" // + dtoa(casinoFactor) + '%'); //} } // We don't expect to see these cases. It would be unusual to get 7 // or more hits, all of them identical. else if (mean >= _instructions.getSuccessMinMove()) { casinoFactor = 100.0; } else { casinoFactor = 0.0; } if (_instructions.getXmlMode()) { summary.properties["CASINO_FACTOR"] = percentToString(casinoFactor); } else { progressMessage += "; "; progressMessage += (_instructions.getScottradeWording() ?"Statistical Confidence Summary" :"Casino Factor"); progressMessage += " = "; progressMessage += colorByValue(colorTable, rtf::bold(percentToString(casinoFactor) + '%'), casinoFactor - 50.0); } } if (!_instructions.getXmlMode()) { progressMessage += rtf::paragraph(); progressMessage += "Daily summary:"; } for (PerDayStats::Iterator it = _perDayStats.begin(); it != _perDayStats.end(); it++) { if (_instructions.getXmlMode()) { XmlNode &row = summary["DAILY"][-1]; row.properties["DATE"] = it->second.date; row.properties["TOTAL"] = dtoaFixed(it->second.profit, _instructions.getSuccessTypePercent()?2:4); row.properties["COUNT"] = ntoa(it->second.count); } else { progressMessage += rtf::paragraph(); double profit = it->second.profit; std::string profitStr = dtoaFixed(profit, _instructions.getSuccessTypePercent()?2:4); progressMessage += it->second.date + " : " + colorByValue(colorTable, profitStr, profit) + '/' + ntoa(it->second.count); } } } if (!paidFor) { if (_instructions.getXmlMode()) { summary.properties["TRIALS_REMAINING"] = ntoa(trialsRemaining); } else { progressMessage += rtf::paragraph(); if (trialsRemaining <= 0) { progressMessage += "No more free trials."; } else if (trialsRemaining == 1) { progressMessage += "One more free trial remains."; } else { progressMessage += ntoa(trialsRemaining); progressMessage += " free trials remain."; } } } if (_instructions.getXmlMode()) { sendProgressMessage(message); } else { sendProgressMessage(colorTable + progressMessage); } if (!_allTrades.empty()) { assert(!_instructions.getStreamingCsv()); TwoDArray response; for (AllTrades::const_iterator it = _allTrades.begin(); it != _allTrades.end(); it++) { TradeDetail::List const &list = it->second; for (TradeDetail::List::const_iterator it1 = list.begin(); it1 != list.end(); it1++) it1->addTo(response); } sendCsvList(response.writeToCSV()); } else if (StreamingCsv::Ref const &csv = _instructions.getStreamingCsv()) { XmlNode node; XmlNode &body = node["ODDSMAKER"]; body.properties["TYPE"] = "csv_header"; body.properties["WINDOW"] = _instructions.getWindowId(); body.text = csv->getHeader(); addToOutputQueue(getSocketInfo(), node.asString("API"), _instructions.getMessageId()); } //if (_showDebugInfo && !_allResults.empty()) // { // sendDebugMessage("25%=" + // formatResult(getResultByPercentile(25)) + // ", 50%=" + // formatResult(getResultByPercentile(50)) + // ", 75%=" + // formatResult(getResultByPercentile(75))); // } _currentState = csDone; } void OddsMakerMainRequest::doWork(WorkerThread *resources) { //std::cout<<"doWork _currentState="<<_currentState<getSocketInfo()), _instructions(externalRequest, messageId), _entryCondition(externalRequest->getProperty("entry_condition")), _exitCondition(externalRequest->getProperty("exit_condition")), _currentState(csDoFirstInit), _totalChildren(0.0), _logRowId(0), _winnerCount(0), _progress(0.0) { } ///////////////////////////////////////////////////////////////////// // OddsMakerDayRequest // // This class does the bulk of the work. It finds all of the matching entries // and exits. One object exists per day. This allows us to do more // multitasking. ///////////////////////////////////////////////////////////////////// class OddsMakerDayRequest : public WorkerThreadRequest { private: // These are the instructions which are shared with, and the same for, all // of the workers. const OddsMakerInstructions _instructions; // And our slice of the work: const int _startIndex; const std::vector< std::string > _listOfDates; std::string const &entryDate() const { return _listOfDates[_startIndex]; } std::string const &exitDate() const { return _listOfDates[_startIndex + _instructions.getDaysToHold()]; } // Current state, so we know where to start the next time we get a timeslice. enum { csStartNewDay, csFindNewEntry, csFindExitAlertInit, csFindExitAlertContinue, csFinishExit, csFinishTrade, csDone } _currentState; // Info about the day. // These are initialized to interesting values when we get our first time- // slice. But we initialize them to 0 in the constructor, in case someone // checks the value before then. This was required to fix a specific bug. std::string _minTime; std::string _maxTime; AlertId _minIdForTheDay; AlertId _maxIdForTheDay; AlertId _startId; AlertId _alertCountForTheDay; std::string _exclude; // Info about the trade. MysqlResultRef _entryDescription; std::string _symbol; std::string _debugMessage; TradeDetail _tradeDetail; time_t _exitAlertHitTime; double _exitAlertPrice; double _timeoutPrice; std::string _timeoutTime; std::string _entryTime; double _entryPrice; AlertId _exitAlertMaxId; AlertId _exitStartId; // We might be updating this more often than we need. For simplicity, every // time we change the start id, look this up. If there are no exit alerts, // we could probably just set this once. Even if there are exit alerts, we // aren't always looking at them. ShardList::Shard const *_currentShard; void setShard(AlertId id); void updateDayNumber(); // Results int _winnerCount; std::vector< double > _results; double _lastReportedPercentComplete; TradeDetail::List _tradeList; // Do internal work. void initializeDay(WorkerThread *resources); void searchForEntry(WorkerThread *resources); void searchForExitAlertInit(WorkerThread *resources); void searchForExitAlertContinue(WorkerThread *resources); void finishExit(WorkerThread *resources); void finishTrade(WorkerThread *resources); double getAlertPrice(MysqlResultRef &alert, DatabaseWithRetry &historicalDatabase); public: bool isCanceled() const { return _instructions.getCancelContainer().isCanceled(); } RequestListener * getReturnListener() { return destination; } int getReturnMessageId() { return mtDayFinished; } void doWork(WorkerThread *resources); void workInMainThread(OddsMakerMainRequest *main); bool done() const { return _currentState == csDone; } OddsMakerDayRequest(OddsMakerInstructions const &instructions, int startIndex, std::vector< std::string > const &dates); ~OddsMakerDayRequest(); const std::string &getWindowId() { return _instructions.getWindowId(); } }; void OddsMakerDayRequest::setShard(AlertId id) { _currentShard = _instructions.getShardList().findAlert(id); updateDayNumber(); } void OddsMakerDayRequest::updateDayNumber() { if (_currentShard) setDayNumber(_currentShard->day); else setDayNumber(DAY_NUMBER_UNKNOWN); } double OddsMakerDayRequest::getAlertPrice(MysqlResultRef &alert, DatabaseWithRetry &historicalDatabase) { double price = alert->getDoubleField("price", 0.0); std::string timestamp = alert->getStringField("timestamp"); std::string symbol = alert->getStringField("symbol"); std::string query = "SELECT high,low FROM candles_5m WHERE symbol='" + mysqlEscapeString(symbol) + "' AND '" + timestamp + "'<= end_time AND '" + timestamp + "' + INTERVAL 5 MINUTE > end_time"; // Note: We could use the MySql GREATEST fuction to implement this, // but this causes some problems. Suddenly exact values like "25.55" // become odd values like "25.549999237061" when you try to use GREATEST // or any other math function on them. MysqlResultRef result = historicalDatabase.tryQueryUntilSuccess(query); double high = result->getDoubleField("high", price); double low = result->getDoubleField("low", price); if (high < price) { price = high; } else if (low > price) { price = low; } return price; } void OddsMakerDayRequest::workInMainThread(OddsMakerMainRequest *main) { for (std::vector< double >::iterator it = _results.begin(); it != _results.end(); it++) { main->addTrade(_minIdForTheDay, *it); } _results.clear(); if (_winnerCount) { main->addWinners(_winnerCount); _winnerCount = 0; } double complete; if (done()) { complete = 100.0; } else { complete = _alertCountForTheDay / (double)_instructions.maxAlertsPerDay(); if (_startId && _minIdForTheDay && _maxIdForTheDay) { double completeByCoverage = (_startId - _minIdForTheDay) / (double)(_maxIdForTheDay - _minIdForTheDay); if (completeByCoverage > complete) { complete = completeByCoverage; } } complete *= 100.0; } if ((_currentState == csFindNewEntry) || (_currentState == csDone)) { // If we called this on every instance of csFindExitAlertContinue the // client would lock up! We should fix the client, but still, // csFindExitAlertContinue does not add anything interesting to the // status report. main->reportStatus(complete - _lastReportedPercentComplete); _lastReportedPercentComplete = complete; } if (done()) { // need to do this once: if (_minIdForTheDay > 0) { // but only if the day was not empty. main->describeDay(_minIdForTheDay, entryDate()); } main->takeTradeList(entryDate(), _tradeList); } } //static int64_t outstandingDayRequests = 0; OddsMakerDayRequest::OddsMakerDayRequest (OddsMakerInstructions const &instructions, int startIndex, std::vector< std::string > const &dates) : WorkerThreadRequest(instructions.getSocket()), _instructions(instructions), _startIndex(startIndex), _listOfDates(dates), _currentState(csStartNewDay), _minIdForTheDay(0), _maxIdForTheDay(0), _startId(0), _alertCountForTheDay(0), _currentShard(_instructions.getShardList().findDay(entryDate())), _winnerCount(0), _lastReportedPercentComplete(0.0) { updateDayNumber(); //int64_t count = __sync_add_and_fetch(&outstandingDayRequests, 1); //sendToLogFile(TclList()<getReadOnlyDatabase(); std::string const ¤tDate = entryDate(); // Start looking for alerts (to enter a trade) at this time. _minTime = secondsToMysql(MarketHours::open() + _instructions.getEntryTimeStart() * 60, currentDate); // Stop looking for alerts (to enter a trade) at this time. _maxTime = secondsToMysql(MarketHours::close() - _instructions.getEntryTimeEnd() * 60, currentDate); // Get first and last ids for the day. std::string sql[6]; sql[0] = "SELECT @t := NULL"; sql[1] = "SELECT @t := timestamp FROM " + _currentShard->table + " WHERE timestamp BETWEEN '" + _minTime + "' AND '" + _maxTime + "' ORDER BY timestamp LIMIT 1"; sql[2] = "SELECT MIN(id) FROM " + _currentShard->table + " WHERE timestamp=@t"; sql[3] = "SELECT @t := NULL"; sql[4] = "SELECT @t := timestamp FROM " + _currentShard->table + " WHERE timestamp BETWEEN '" + _minTime + "' AND '" + _maxTime + "' ORDER BY timestamp DESC LIMIT 1"; sql[5] = "SELECT MAX(id) FROM " + _currentShard->table + " WHERE timestamp=@t"; DatabaseWithRetry::ResultList result = historicalDatabase.tryAllUntilSuccess(&sql[0], &sql[6]); _minIdForTheDay = result[2]->getIntegerField(0, -1); _maxIdForTheDay = result[5]->getIntegerField(0, -1); if ((_minIdForTheDay < 0) || (_maxIdForTheDay < 0)) { _currentState = csDone; return; } _startId = _minIdForTheDay; setShard(_startId); //assert(_currentShard && "Initial shard does not exist!"); if (!_currentShard) { // This should not happen. We know this day must be in the list or we // not have gotten this far. But we still see thing sometimes. The // assertion has failed on numerous occasions, but somewhat randomly. // Definately not all the time, and definately not repeating over and // over like you might expect. // // It is possible that the database is in an inconsistent state. Bad // values in the database could cause us to get here. However, every // time I used gdb to track something down, I looked for _minIdForTheDay // in the datbase and did not see any problems. // // This isn't really a fatal state. This should be caught in // searchForEntry and we just end the day early. There isn't a need to // shut down the app. // // This should be fixed as of 12/23/2011. There was a bug where we // were saving a pointer to the main copy of the instructions, rather // than our own copy of the instructions. In just the right // circumstances we could try to access that memory after it had been // overwritten. At that point we were looking at (effectively) random // data. Most of the time it looked like our search feature was buggy, // but one time I got a NULL pointer error accessing the table name. // std::string should never do this! TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"Initial shard does not exist!" <<"_startId" <<_startId <<"entryDate()" < _maxIdForTheDay) || (_alertCountForTheDay >= _instructions.maxAlertsPerDay())) { _currentState = csDone; return; } DatabaseWithRetry &historicalDatabase = *resources->getReadOnlyDatabase(); const AlertId endId = std::min(_maxIdForTheDay, std::min(_startId+9999, _currentShard->maxId)); std::string whereCondition = "price > 0 AND id BETWEEN " + ntoa(_startId) + " AND " + ntoa(endId) + " AND timestamp BETWEEN '" + _minTime + "' AND '" + _maxTime + "'"; if (_exclude != "") { whereCondition += " AND alerts.symbol NOT IN (" + _exclude + ")"; } if (_instructions.getStopLossWiggle()) { whereCondition += " AND relvol IS NOT NULL AND volatility IS NOT NULL "; } _instructions.getShardList().sendShardToLog(_startId); //const TimeVal start(true); _entryDescription = historicalDatabase.tryQueryUntilSuccess (_instructions.getEntryGenerator().get(whereCondition, 1, _currentShard->table)); //const TimeVal end(true); // The line above caused a segmentation violation. _currentShard was not // NULL. But _currentShard->table and _chrrentShared->date were invalid. // They both contained a data pointer that was NULL. That should never // happen. This happened on September 12, 2014 on chuck-liddell. /* TclList msg; msg<day <<(end.asMicroseconds()-start.asMicroseconds()) <<_entryDescription->rowIsValid() <<_instructions.getEntryGenerator().get(whereCondition, 1, _currentShard->table); sendToLogFile(msg); // This code found some really slow queries. These were fixed in revision // 1.132 of AlertConfig.C and revision 1.66 of AlertConfig.h */ if (!_entryDescription->rowIsValid()) { // We did not find an entry alert before or at endId. Keep looking. _startId = endId + 1; setShard(_startId); return; } _alertCountForTheDay++; _symbol = _entryDescription->getStringField("symbol"); _entryTime = _entryDescription->getStringField("timestamp"); _entryPrice = getAlertPrice(_entryDescription, historicalDatabase); _debugMessage = ntoa(_alertCountForTheDay); switch (_debugMessage.size()) { case 1: _debugMessage = "00" + _debugMessage; break; case 2: _debugMessage = "0" + _debugMessage; } _debugMessage = "#" + entryDate() + "_" + _debugMessage + " " + _symbol + ", Entry time=" + _entryTime + // We should really report when we fix a price using getAlertPrice(). ", Entry price=" + priceToString(_entryPrice); _tradeDetail.setSymbol(_symbol); _tradeDetail.setEntryTime(_entryTime); _tradeDetail.setEntryPrice(_entryPrice); _tradeDetail.setEntryAlert(_entryDescription->getStringField("alert_type")); _tradeDetail.setExchange(_entryDescription->getStringField("list_exch")); // Look at the timout. Do this first. If the other exit conditions // don't exist, we ignore them. If the timeout does not exist, we // throw away the entry condition. Otherwise you can get strange // results. If you try the same query twice, once with a trailing stop // and once without, you might get a different number of hits. bool timeoutPriceFound = false; { std::string timeoutPriceSql; if (_instructions.timeoutTypeFutureOpen() || _instructions.timeoutTypeFutureClose()) { timeoutPriceSql = "SELECT "; if (_instructions.timeoutTypeFutureOpen()) { timeoutPriceSql += "open, date + INTERVAL " + ntoa(MarketHours::open()); } else { timeoutPriceSql += "close, date + INTERVAL " + ntoa(MarketHours::close()); } timeoutPriceSql += " SECOND FROM candles_d WHERE symbol='" + mysqlEscapeString(_symbol) + "' AND date = '" + exitDate() + "'"; } else { timeoutPriceSql = "SELECT last_price, end_time FROM candles_5m " "WHERE symbol='" + mysqlEscapeString(_symbol) + "' AND end_time <= '" + _instructions.intradayEndTime(_entryTime) + "' AND end_time > '" + _entryTime + "' ORDER BY end_time DESC LIMIT 1"; } MysqlResultRef timeoutResult = historicalDatabase.tryQueryUntilSuccess(timeoutPriceSql); if (!(timeoutResult->fieldIsEmpty(0) || timeoutResult->fieldIsEmpty(1))) { timeoutPriceFound = true; _timeoutPrice = timeoutResult->getDoubleField(0, 0.0); _timeoutTime = timeoutResult->getStringField(1); _debugMessage += ", Timeout price="; _debugMessage += timeoutResult->getStringField(0); _debugMessage += ", Timeout time="; _debugMessage += _timeoutTime; _tradeDetail.setTimeout(_timeoutPrice, _timeoutTime); } } if (timeoutPriceFound) { _currentState = csFindExitAlertInit; } else { // Do not report anything. Jump to the clean up code. _currentState = csFinishTrade; } } void OddsMakerDayRequest::searchForExitAlertInit(WorkerThread *resources) { // Look for an exit alert. This could be an explicit alert from // the config window, or this could be a trailing stop. _currentState = csFinishExit; if (!_instructions.exitConditionNone()) { DatabaseWithRetry &historicalDatabase = *resources->getReadOnlyDatabase(); std::string endTimeSql[3]; // This is a different shard than we requested when we got this // timeslice. That's probably not terrible, but it would be nice to do // better. ShardList::Shard const *shard = _instructions.getShardList().findDay(std::string(_timeoutTime, 0, 10)); if (!shard) { // Presumably if we get this far, there should be a shard. The // _timeoutTime was selected by a number of means, including the // user input (exit 5 days later) and what was in the database // (candles_5m, holidays) so this value should be reasonable. But // it's possible that we just haven't created this shard yet. TclList msg; msg<table + " WHERE timestamp >= '" + _minTime + "' AND timestamp < '" + _timeoutTime + "' ORDER BY timestamp DESC LIMIT 1"; endTimeSql[2] = "SELECT MAX(id) FROM " + shard->table + " WHERE timestamp=@t"; _exitAlertMaxId = historicalDatabase .tryAllUntilSuccess(&endTimeSql[0], &endTimeSql[3]) [2]->getIntegerField(0, 0); if (!_exitAlertMaxId) { return; } _exitStartId = _entryDescription->getIntegerField("id", _exitAlertMaxId) + 1; setShard(_exitStartId); _currentState = csFindExitAlertContinue; } } void OddsMakerDayRequest::searchForExitAlertContinue(WorkerThread *resources) { // This can take way to long, so we had to split it up into smaller peices. if ((!_currentShard) || (_exitStartId > _exitAlertMaxId)) { /* TclList msg; msg< _exitAlertMaxId" <<(_exitStartId > _exitAlertMaxId?"yes":"no"); sendToLogFile(msg); */ _currentState = csFinishExit; return; } DatabaseWithRetry &historicalDatabase = *resources->getReadOnlyDatabase(); const AlertId exitEndId = std::min(_exitAlertMaxId, std::min(_exitStartId + 9999, _currentShard->maxId)); std::string exitAlertWhereCondition = "id BETWEEN " + ntoa(_exitStartId) + " AND " + ntoa(exitEndId) + " AND timestamp >= '" + _entryTime + "' AND timestamp < '" + _timeoutTime + "' AND alerts.symbol ='" + mysqlEscapeString(_symbol) + "'"; if (_instructions.getDaysToHold()) { // Avoid after hours trading, weekends and holidays. exitAlertWhereCondition += " AND TIME(timestamp) BETWEEN SEC_TO_TIME(" + ntoa(MarketHours::open()) + ") AND SEC_TO_TIME(" + ntoa(MarketHours::close()) + ") AND DATE(timestamp) IN ('" + entryDate() + "'"; for (int i = 1; i <= _instructions.getDaysToHold(); i++) { exitAlertWhereCondition += ",'"; exitAlertWhereCondition += _listOfDates[_startIndex + i]; exitAlertWhereCondition += "'"; } exitAlertWhereCondition += ")"; } _instructions.getShardList().sendShardToLog(_exitStartId); const std::string exitAlertSql = _instructions.getExitGenerator().get(exitAlertWhereCondition, 1, _currentShard->table); MysqlResultRef exitAlert = historicalDatabase.tryQueryUntilSuccess(exitAlertSql); /* TclList msg; msg<rowIsValid()?"yes":"no"); sendToLogFile(msg); */ if (exitAlert->rowIsValid()) { _exitAlertHitTime = exitAlert->getIntegerField("UT", 0); _exitAlertPrice = getAlertPrice(exitAlert, historicalDatabase); _debugMessage += ", Exit alert price="; _debugMessage += priceToString(_exitAlertPrice); _debugMessage += " found at "; _debugMessage += exitAlert->getStringField("timestamp"); _tradeDetail.setExitAlert(_exitAlertPrice, _exitAlertHitTime); _currentState = csFinishExit; return; } _exitStartId = exitEndId + 1; setShard(_exitStartId); } void OddsMakerDayRequest::finishExit(WorkerThread *resources) { // See if we hit a profit target or a stop loss. Merge this with data about // other possible exit conditions to find the final disposition of this // trade. DatabaseWithRetry &historicalDatabase = *resources->getReadOnlyDatabase(); // See if we ever hit the profit target. time_t profitTargetHitTime = 0; double profitTargetPrice = 0; if (_instructions.getProfitTarget()) { profitTargetPrice = _instructions.direction() * _instructions.getProfitTarget(); if (_instructions.getSuccessTypePercent()) { profitTargetPrice = _entryPrice * (1 + profitTargetPrice / 100); } else { profitTargetPrice = _entryPrice + profitTargetPrice; } _debugMessage += ", Profit target price="; _debugMessage += priceToString(profitTargetPrice); _tradeDetail.setProfitTargetPrice(profitTargetPrice); std::string sql = "SELECT MIN(end_time), unix_timestamp(MIN(end_time)) " "FROM candles_5m " "WHERE symbol = '" + mysqlEscapeString(_symbol) + "' AND " + (_instructions.getSuccessDirectionUp()?"high >= ":"low <= ") + dtoa(profitTargetPrice) + " AND end_time <= '" + _timeoutTime + "' AND end_time > '" + _entryTime + "' + INTERVAL 5 MINUTE"; // Notice that we skip the candle where the entry was. We // have no way to know if we hit the profit target before or // after the entry signal. MysqlResultRef profitTargetResult = historicalDatabase.tryQueryUntilSuccess(sql); if (!profitTargetResult->fieldIsEmpty(0)) { profitTargetHitTime = profitTargetResult->getIntegerField(1, 0); _debugMessage += " found before "; _debugMessage += profitTargetResult->getStringField(0); _tradeDetail.setProfitTargetTime(profitTargetHitTime); } } // See if we hit the stop loss. time_t stopLossHitTime = 0; double stopLossPrice = 0; if (_instructions.getStopLoss() || _instructions.getStopLossWiggle()) { stopLossPrice = _instructions.direction() * _instructions.getStopLoss(); if (_instructions.getSuccessTypePercent()) { stopLossPrice = _entryPrice * (1 - stopLossPrice / 100); } else { stopLossPrice = _entryPrice - stopLossPrice; } if (_instructions.getStopLossWiggle()) { // This should not be NULL. We've checked for it in the // initial query, and we should never have seen an entry // signaal if the wiggle was NULL. stopLossPrice -= _instructions.direction() * _entryDescription->getDoubleField("wiggle", 0.0); } _debugMessage += ", Stop loss price="; _debugMessage += priceToString(stopLossPrice); _tradeDetail.setStopLossPrice(stopLossPrice); std::string sql = "SELECT MIN(end_time), unix_timestamp(MIN(end_time)) " "FROM candles_5m " "WHERE symbol = '" + mysqlEscapeString(_symbol) + "' AND " + (_instructions.getSuccessDirectionUp()?"low <= ":"high >= ") + dtoa(stopLossPrice) + " AND end_time <= '" + _timeoutTime + "' AND end_time > '" + _entryTime + "' + INTERVAL 5 MINUTE"; // Notice that we skip the candle where the entry was. We // have no way to know if we hit the stop loss before or // after the entry signal. MysqlResultRef stopLossResult = historicalDatabase.tryQueryUntilSuccess(sql); if (!stopLossResult->fieldIsEmpty(0)) { stopLossHitTime = stopLossResult->getIntegerField(1, 0); _debugMessage += " found before "; _debugMessage += stopLossResult->getStringField(0); _tradeDetail.setStopLossTime(stopLossHitTime); } } // Determine which exit condition to use. std::string reasonForExit; double exitPrice = 0; if (profitTargetHitTime || stopLossHitTime || _exitAlertHitTime) { // For simplicity the adjusted time uses a very large number, // rather than 0, to represent an event that never happened. // Since we are looking for the first event, that amounts to // finding the lowest value. time_t profitTargetAdjustedTime = profitTargetHitTime? profitTargetHitTime: std::numeric_limits< time_t >::max(); time_t stopLossAdjustedTime = stopLossHitTime? stopLossHitTime: std::numeric_limits< time_t >::max(); // We want to compare the exit alert time to the profit target // and stop loss on a fair footing. Those are always quoted by // the time at the end of the candle. // 5 minutes * 60 seconds / minute = 300 seconds. // exitAlertAdjustedTime is the close time of the candle that // includes this alert. time_t exitAlertAdjustedTime = _exitAlertHitTime? (((_exitAlertHitTime+299)/300)*300): std::numeric_limits< time_t >::max(); // Look for cases when the two or more of the events are too // close to one another to know which one happened first. We // will pick one as the right answer, but we will report the // problem to the debug log. if (profitTargetHitTime && (profitTargetHitTime == stopLossHitTime)) { _debugMessage += ", Warning profit target and stop loss close together"; _tradeDetail.setTimeWarning(); } if (_exitAlertHitTime && (exitAlertAdjustedTime == profitTargetAdjustedTime)) { _debugMessage += ", Warning exit alert and profit target close together"; _tradeDetail.setTimeWarning(); } if (_exitAlertHitTime && (exitAlertAdjustedTime == stopLossHitTime)) { _debugMessage += ", Warning exit alert and stop loss close together"; _tradeDetail.setTimeWarning(); } // Find the time of the first event. int firstEventTime = std::min(profitTargetAdjustedTime, std::min(exitAlertAdjustedTime, stopLossAdjustedTime)); // Find the price associated with this time. There might be more // than one event associated with the time, so pick the best // price. This particular structure allows us to report the // source of the price, too. exitPrice = -_instructions.direction() * std::numeric_limits< double >::max(); if ((profitTargetAdjustedTime == firstEventTime) && _instructions.betterThan(profitTargetPrice, exitPrice)) { reasonForExit = "profit target"; exitPrice = profitTargetPrice; _tradeDetail.setExit(TradeDetail::ProfitTarget, profitTargetPrice); } if ((stopLossAdjustedTime == firstEventTime) && _instructions.betterThan(stopLossPrice, exitPrice)) { reasonForExit = "stop loss"; exitPrice = stopLossPrice; _tradeDetail.setExit(TradeDetail::StopLoss, stopLossPrice); } if ((exitAlertAdjustedTime == firstEventTime) && _instructions.betterThan(_exitAlertPrice, exitPrice)) { reasonForExit = "exit alert"; exitPrice = _exitAlertPrice; _tradeDetail.setExit(TradeDetail::ExitAlert, _exitAlertPrice); } } //if (profitTargetHitTime || stopLossHitTime || _exitAlertHitTime) else { reasonForExit = "timeout"; exitPrice = _timeoutPrice; _tradeDetail.setExit(TradeDetail::Timeout, _timeoutPrice); } _debugMessage += ", Reason for exit="; _debugMessage += reasonForExit; _debugMessage += ", Exit price="; _debugMessage += priceToString(exitPrice); // We now have an entry and and exit. Accumulate statistics on this // trade. double move = (exitPrice - _entryPrice) * _instructions.direction(); if (_entryPrice) { // Normally there would not be an entry price of 0. But it could be // possible for one of those wierd indexes to be 0. It might // even be some sort of bad data, although we do what we can // to avoid that. double percentMove = move * 100.0 / _entryPrice; //if (fabs(percentMove) >= 45) // We sometimes get numbers near +/- 50% because of a split. // We used to throw those away, but with the longer term trades, I // don't think we can do that any more. if (_instructions.getSuccessTypePercent()) { move = percentMove; } _results.push_back(move); bool winner = move >= _instructions.getSuccessMinMove(); if (winner) { _winnerCount++; } _debugMessage += winner?", Winner":", Loser"; _tradeDetail.setWinner(winner); _debugMessage += ", Moved " + _instructions.formatResult(move); _tradeDetail.setMoved(move); if (_instructions.getReturnCsvFile()) { for (PropertyList::const_iterator it = _instructions.getCsvNames().begin(); it != _instructions.getCsvNames().end(); it++) { _tradeDetail.getColumnInfo()[it->second] = _entryDescription->getStringField(it->first); } if (StreamingCsv::Ref const &csv = _instructions.getStreamingCsv()) { XmlNode node; XmlNode &body = node["ODDSMAKER"]; body.properties["TYPE"] = "csv_row"; body.properties["WINDOW"] = _instructions.getWindowId(); body.text = csv->getRow(_tradeDetail); addToOutputQueue(getSocketInfo(), node.asString("API"), _instructions.getMessageId()); } else _tradeList.push_back(_tradeDetail); } // We should always report progress if we get here. } // Entry price is not 0 _currentState = csFinishTrade; } void OddsMakerDayRequest::finishTrade(WorkerThread *resources) { // We will get here if we found an entry alert, even if it wasn't good // enough to report. Start looking for the next entry alert immeidately // after the previous entry alert. _startId = 1 + _entryDescription->getIntegerField("id", std::numeric_limits< AlertId >::max()); setShard(_startId); if (_exclude != "") { _exclude += ", "; } _exclude += "'" + mysqlEscapeString(_symbol) + "'"; _instructions.sendDebugMessage(_debugMessage); _currentState = csFindNewEntry; } void OddsMakerDayRequest::doWork(WorkerThread *resources) { switch (_currentState) { case csStartNewDay: initializeDay(resources); break; case csFindNewEntry: searchForEntry(resources); break; case csFindExitAlertInit: searchForExitAlertInit(resources); break; case csFindExitAlertContinue: searchForExitAlertContinue(resources); break; case csFinishExit: finishExit(resources); break; case csFinishTrade: finishTrade(resources); break; default: // This is only to avoid a compiler warning. break; } } ///////////////////////////////////////////////////////////////////// // ///////////////////////////////////////////////////////////////////// void OddsMakerMainRequest::createChildren(WorkerCluster &workerCluster) { assert(_outstandingChildren.empty()); int toSkip = _instructions.getDaysToHold(); const int toTest = std::max((int)(_dates.size() - toSkip), 0); for (int i = 0; i < toTest; i++) { OddsMakerDayRequest *const child = new OddsMakerDayRequest(_instructions, i, _dates); _outstandingChildren.insert(child); workerCluster.addWork(child); } _dates.clear(); _totalChildren = _outstandingChildren.size(); } ///////////////////////////////////////////////////////////////////// // ListenerInfo // // One of these per socket. It contains the message id for streaming // responses. It also allows us to look up a request by its client // id. That allows the client to cancel a request. // // This used to directly contain all of the requests associated with // each client request. Now ths only contains OddsMakerMainRequest // objects. Those contain pointers to their children. ///////////////////////////////////////////////////////////////////// class ListenerInfo { private: struct WindowInfo { OddsMakerMainRequest *mainRequest; OddsMakerCancel cancel; bool busy; WindowInfo(OddsMakerMainRequest *mainRequest) : mainRequest(mainRequest), cancel(mainRequest->getCancelContainer()), busy(false) { } // This is required by std::map. :( It will always follow this with an // assignment, so it's as if we just called the previous constructor // directly. I set the fields to values which are most likely to signal // errors down the road. It's tempting to have a special flag just for // this purpose, to make sure no one calls this constructor without doing // an assignment later. WindowInfo() : mainRequest(NULL), busy(true) { } }; std::map< std::string, WindowInfo > _byClientId; public: // For returning data to the client. ExternalRequest::MessageId messageId; // Finds the request given the client window id. Returns NULL if not found. OddsMakerMainRequest *find(std::string const &windowId) const { WindowInfo const *windowInfo = getProperty(_byClientId, windowId); if (!windowInfo) return NULL; else return windowInfo->mainRequest; } // Call remove() first if necessary. It is an error to add the same // windowId twice. void add(OddsMakerMainRequest *request, std::string const &windowId) { assert(!_byClientId.count(windowId)); _byClientId[windowId] = request; } // Call remove() first if necessary. It is an error to add the same // windowId twice. void add(OddsMakerMainRequest *request) { add(request, request->getWindowId()); } // It is safe to call this even if the windowId does not exist. void remove(std::string const &windowId) { _byClientId.erase(windowId); } // Do this when adding the main request to the cluster. // It must not already be busy. It would be bad to send a request to a // worker thread if that request is already at the worker thread. void setBusy(std::string const &windowId) { WindowInfo *windowInfo = getProperty(_byClientId, windowId); assert(windowInfo && !windowInfo->busy); windowInfo->busy = true; } void setBusy(OddsMakerMainRequest *request) { setBusy(request->getWindowId()); } // Do this when removing the main request from the cluster. // It must currently be in the busy state. Otherwise someone is confused. void setNotBusy(std::string const &windowId) { WindowInfo *windowInfo = getProperty(_byClientId, windowId); assert(windowInfo && windowInfo->busy); windowInfo->busy = false; } void setNotBusy(OddsMakerMainRequest *request) { setNotBusy(request->getWindowId()); } // True if and only if this request is in the worker cluster. bool isBusy(std::string const &windowId) { WindowInfo *windowInfo = getProperty(_byClientId, windowId); assert(windowInfo); return windowInfo->busy; } // This function will tell each request that we are done with it. We check // this flag each time a message is returned to the main thread. // // Note that OddsMakerDayRequest objects are almost always in the worker // cluster. If we receive one in the main thread, we store it in a local // variable for a short time then send it back to the worker cluster or // delete it. So calling this function should always be sufficient. // // OddsMakerMainRequests are more complicated. They might be hiding in this // object, waiting for an OddsMakerDayRequest to return them to action. Or // they might be at the worker cluster. In the latter case calling cancel() // is sufficient. In the former case you need to manually delete them. Use // isBusy() to know which is the case. void cancel(std::string const &windowId) { WindowInfo *windowInfo = getProperty(_byClientId, windowId); assert(windowInfo); // All requests point to the OddsMakerCancel object, not the other way // around, because it was easier that way. Maybe if I was starting fresh // all Request objects would be smart pointers. windowInfo->cancel.cancel(); } }; ///////////////////////////////////////////////////////////////////// // OddsMaker ///////////////////////////////////////////////////////////////////// class OddsMaker : private ThreadClass { private: std::map< SocketInfo *, ListenerInfo > _listenerInfo; RequestQueue _incoming; WorkerCluster _workerCluster; void cancelRequestImpl(SocketInfo *socket, std::string windowId); void terminateRequest(OddsMakerMainRequest *request); void initDebugInfo(); protected: void threadFunction(); public: OddsMaker(); ~OddsMaker(); }; inline void OddsMaker::initDebugInfo() { std::vector< std::string >pieces = explode(",", getConfigItem("debug_info")); for (std::vector< std::string >::const_iterator it = pieces.begin(); it != pieces.end(); it++) { // This is a list of user id's of people who can see the debug info. UserId id = strtolDefault(*it, -1); if (id > 0) { showDebugInfo.insert(id); } } } void OddsMaker::cancelRequestImpl(SocketInfo *socket, std::string windowId) { ListenerInfo &listener = _listenerInfo[socket]; if (OddsMakerMainRequest *const request = listener.find(windowId)) { // We used to call request->removeAll(_workerCluster) but we can't be // sure that no one has deleted the request object! listener.cancel(windowId); if (!listener.isBusy(windowId)) // Otherwise it will get deleted as soon as it returns to this thread. delete request; listener.remove(windowId); } } void OddsMaker::terminateRequest(OddsMakerMainRequest *request) { SocketInfo *socket = request->getSocketInfo(); ExternalRequest::MessageId messageId = _listenerInfo[socket].messageId; if (messageId.present()) { XmlNode message; XmlNode &group = message["ODDSMAKER"]; group.properties["WINDOW"] = request->getWindowId(); group.properties["TYPE"] = "finished"; addToOutputQueue(socket, message.asString("API"), messageId); } cancelRequestImpl(request->getSocketInfo(), request->getWindowId()); //delete request; Already done as part of cancelRequestImpl()!!! ThreadMonitor::find().increment("finishedRequest"); } OddsMaker::OddsMaker() : ThreadClass("OddsMaker"), _incoming("OddsMaker"), _workerCluster("history_databases", "OddsMaker", mtWorkFinished, &_incoming) { CommandDispatcher *dispatcher = CommandDispatcher::getInstance(); dispatcher->listenForCommand("oddsmaker_listen", &_incoming, mtOpenChannel); dispatcher->listenForCommand("oddsmaker_new_request", &_incoming, mtRequestOdds); dispatcher->listenForCommand("oddsmaker_cancel_request", &_incoming, mtCancelOdds); startThread(); } OddsMaker::~OddsMaker() { Request *r = new Request(NULL); r->callbackId = mtQuit; _incoming.newRequest(r); waitForThread(); } void OddsMaker::threadFunction() { // The layout for this function is based heavily on the layout for // HistoryHandler::threadFunction(). destination = &_incoming; ThreadMonitor::find().setState("initDebugInfo()"); initDebugInfo(); ThreadMonitor::find().setState("init NormalDistribution"); normalDistribution = new NormalDistribution(10000); ThreadMonitor::find().setState("working"); while (true) { while (Request *current = _incoming.getRequest()) { switch (current->callbackId) { case mtOpenChannel: { // All the streaming data goes down one channel. ExternalRequest *request = dynamic_cast(current); _listenerInfo[request->getSocketInfo()].messageId = request->getResponseMessageId(); break; } case mtRequestOdds: { ExternalRequest *request = dynamic_cast(current); SocketInfo *socket = request->getSocketInfo(); ListenerInfo &listener = _listenerInfo[socket]; OddsMakerMainRequest *persistant = new OddsMakerMainRequest(request, listener.messageId); cancelRequestImpl(socket, persistant->getWindowId()); listener.add(persistant); listener.setBusy(persistant); _workerCluster.addWork(persistant); ThreadMonitor::find().increment("newRequest"); break; } case mtCancelOdds: { ExternalRequest *request = dynamic_cast(current); cancelRequestImpl(request->getSocketInfo(), request->getProperty("window_id")); ThreadMonitor::find().increment("canceledRequest"); break; } case mtWorkFinished: assert(false); break; case mtMainFinished: { OddsMakerMainRequest *request = dynamic_cast(current); bool aborted = _workerCluster.acknowledge(request); if ((!aborted) && (!request->isCanceled())) { ListenerInfo &listener = _listenerInfo[request->getSocketInfo()]; listener.setNotBusy(request); current = NULL; if (request->done()) { // Normal termination. terminateRequest(request); } else if (request->readyToCreate()) { request->createChildren(_workerCluster); if (!request->waitingForChildren()) { // No children. Start the cleanup. This is the // same as right after the last child is done. request->noChildren(); _workerCluster.addWork(request); listener.setBusy(request); } } else { // Still working. Give it another timeslice when // one is available. _workerCluster.addWork(request); listener.setBusy(request); } } break; } case mtDayFinished: { OddsMakerDayRequest *request = dynamic_cast(current); SocketInfo *socket = request->getSocketInfo(); bool aborted = _workerCluster.acknowledge(request); if ((!aborted) && (!request->isCanceled())) { // update the main object with any new status. ListenerInfo &listener = _listenerInfo[socket]; if (OddsMakerMainRequest *main = listener.find(request->getWindowId())) { // This should never be null. This request should have // been aborted if main was null. request->workInMainThread(main); if (request->done()) { // Normal termination. main->childIsDone(request); if (!main->waitingForChildren()) { _workerCluster.addWork(main); ListenerInfo &listener = _listenerInfo[request->getSocketInfo()]; listener.setBusy(main); } } else { // Still working. Give it another timeslice when // one is available. current = NULL; _workerCluster.addWork(request); } } } break; } case DeleteSocketThread::callbackId: { SocketInfo *socket = current->getSocketInfo(); _listenerInfo.erase(socket); _workerCluster.remove(socket); break; } case mtQuit: { delete current; return; } } delete current; } _workerCluster.matchAll(); _incoming.waitForRequest(); } } ///////////////////////////////////////////////////////////////////// // Global ///////////////////////////////////////////////////////////////////// static OddsMaker *instance = NULL; void initOddsmaker() { assert(!instance); instance = new OddsMaker; }