#include #include "SplitList.h" #include "../shared/MarketHours.h" #include "../shared/DatabaseForThread.h" #include "CandleClient.h" #include "../shared/MiscSQL.h" #include "../shared/ThreadMonitor.h" #include "../shared/SimpleLogFile.h" #include "../shared/GlobalConfigFile.h" #include "DailyCandles.h" // TODO When we update from the database we should check for changes. If // something has changed we should report it. I.e. an old correction or a // stock split. Like we do for intraday data. TclList DailyCandles::debugDump() const { TclList result = CandleDataNode::debugDump(); result<<"_symbol"<<_symbol; { auto fromRealtime = _fromRealtime.readLock(); result<<"currentState"<currentState.debugDump() <<"candleStartTime"<candleStartTime; } result<<"_databaseCache"<<_databaseCache.debugDump(); return result; } std::string DailyCandles::timeTToMysqlDate(time_t time) { return dateToMysql(time, false); } // Read from daily_candle_cache_size. If we disable caching, this will return // the empty string. (See cacheDisabled().) Otherwise this will return a // sanitized string suitable for use with "LIMIT" in MySql. std::string const &DailyCandles::getCacheSize() { static std::string result = []()->std::string{ // This code will run the first time someone calls getCacheSize(). g++ // automatically makes this thread safe. auto size = getConfigItem("daily_candle_cache_size", 0); // Default is off if (size == -1) size = 350; // Say -1 to use the recommended cache size. TclList msg; // FLF doesn't work well in a lambda. The function is listed as (). msg<; assert(!cacheDisabled()); const SplitList splitList(symbol); const std::string sql = "SELECT date, open, high, low, close, volume FROM bars_d WHERE symbol = \"" + mysqlEscapeString(symbol) + "\" ORDER BY date DESC LIMIT " + getCacheSize(); MysqlResultRef databaseResult = DatabaseForThread(DatabaseWithRetry::CANDLES) ->tryQueryUntilSuccess(sql, "Daily Cache"); result->resize(databaseResult->numRows()); // We requested the data in DESCENDING order. That was required to make the // LIMIT clause work right, so we'd get the most recent data. But we want to // store the data in our cache in ASCENDING order. So we're traversing the // database result in the normal order, but traversing the vector in reverse // order. for (auto it = result->rbegin(); it != result->rend(); it++, databaseResult->nextRow()) { assert(databaseResult->rowIsValid()); SingleCandleWithTime &toAdd = *it; toAdd.startTime = mysqlToTimeT(databaseResult->getStringField(0)) + MARKET_HOURS_OPEN; toAdd.candle.open = databaseResult->getDoubleField(1, 0.0); toAdd.candle.high = databaseResult->getDoubleField(2, 0.0); toAdd.candle.low = databaseResult->getDoubleField(3, 0.0); toAdd.candle.close = databaseResult->getDoubleField(4, 0.0); toAdd.candle.volume = databaseResult->getIntegerField(5, 0); splitList.fixCandle(toAdd.startTime, toAdd.candle); } assert(!databaseResult->rowIsValid()); sendToLogFile(TclList()<size() <<"time in µs"<getValid()) return; TosData const &last = _tosData->getLast(); SingleCandle newCandle; newCandle.open = last.open; newCandle.high = last.high; newCandle.low = last.low; newCandle.close = last.todaysClose; newCandle.volume = last.volume; if (last.time > _resetTime) { time_t newTime = midnight(last.time); _resetTime = newTime + 24 * MARKET_HOURS_HOUR; newTime += MARKET_HOURS_OPEN; auto fromRealtime = _fromRealtime.writeLock(); fromRealtime->currentState = newCandle; fromRealtime->candleStartTime = newTime; fromRealtime->epochCounter.addEpoch(fromRealtime->candleStartTime); } else if (_fromRealtime.readLock()->currentState != newCandle) { auto fromRealtime = _fromRealtime.writeLock(); fromRealtime->currentState = newCandle; fromRealtime->epochCounter.addEpoch(fromRealtime->candleStartTime); } } void DailyCandles::onBroadcast(BroadcastMessage &message, int msgId) { } DailyCandles::DailyCandles(std::string const &key, std::string const &symbol) : CandleDataNode(key), _tosData(NULL), _symbol(symbol), _resetTime(0), _databaseCache(!cacheDisabled(), [symbol]() { return getCacheFromDatabase(symbol); }) { getManager().getDataNodeManager().addToQueue([=](){ _fromRealtime.setWriteThread(); addAutoLink(GenericTosDataNode::find(this, 0, _tosData, _symbol)); onWakeup(0); }); } time_t DailyCandles::threadSafeRestartAt(EpochCounter::Epoch epoch) { return _fromRealtime.readLock()->epochCounter.restartAt(epoch); } /* time_t DailyCandles::oldestCandle(std::string const &symbol) { static time_t fromConfig = atolDefault(getConfigItem("oldest_daily_candle"), 0); if (fromConfig) return fromConfig; if (candleDatabase && !CandleClient::instance()) { std::string sql = "SELECT UNIX_TIMESTAMP(COALESCE(MIN(date),CURDATE())) " "FROM bars_d WHERE symbol=\"" + symbol + '"'; return candleData->tryQueryUntilSuccess(sql)->getIntegerField(0, 0); } return time(NULL); } */ time_t DailyCandles::oldestFastTime() { time_t result = _fromRealtime.readLock()->candleStartTime; if (!result) maximize(result); CachedCandles cached = _databaseCache.get(); if (!cached->empty()) result = std::min(result, cached->rbegin()->startTime); return result; } static const std::string s_startTimeListFirst = "DailyCandles::startTimeListFirst"; DailyCandles::StartTimeList DailyCandles::startTimeList(time_t startBefore) { ThreadMonitor::SetState tm(s_startTimeListFirst); tm.increment(s_startTimeListFirst); SingleCandle currentState; time_t candleStartTime; { auto fromRealtime = _fromRealtime.readLock(); currentState = fromRealtime->currentState; candleStartTime = fromRealtime->candleStartTime; } if ((candleStartTime != 0) && (candleStartTime < startBefore) && !currentState.empty()) { std::vector< time_t > *times = new std::vector< time_t >; times->push_back(candleStartTime); return StartTimeList(times, _databaseCache.get()); } else return StartTimeList(startBefore, _databaseCache.get()); } static const std::string s_startTimeListMore = "DailyCandles::startTimeListMore"; DailyCandles::StartTimeList DailyCandles::startTimeList(StartTimeList const &previous) { if (!previous.moreData()) return StartTimeList(); // We're already at the end. // Assume the first call to startTimeList() exhausted anything that's // in memory, so we go directly to the database. ThreadMonitor::SetState tm(s_startTimeListMore); tm.increment(s_startTimeListMore); std::vector< time_t > *times = new std::vector< time_t >; std::string sql = "SELECT date FROM bars_d WHERE symbol = '" + mysqlEscapeString(_symbol) + "' AND date <'"; if (secondOfTheDay(previous.restartBefore()) > MARKET_HOURS_OPEN) // Consistent with the cache. If the candle at least started before the // given cutoff time, even if it ends after the cutoff time, try to // grab the value. sql += timeTToMysql(previous.restartBefore()); else // We were asked to start before the open. However, the database table // only stores dates. By default "2018-12-21" < "2018-12-21 03:00:00" is // certainly true. What we really meant was "2018-12-21 06:30:00" < // "2018-12-21 03:00:00" (which is false) because the candle really starts // at 6:30am, not midnight. Or, equivalently, we compare "2018-12-21" < // "2018-12-21" to get false. sql += dateToMysql(previous.restartBefore(), false); sql += "' ORDER BY date DESC LIMIT 350"; //sendToLogFile(TclList()<tryQueryUntilSuccess(sql, s_startTimeListMore); times->reserve(databaseResult->numRows()); for (; databaseResult->rowIsValid(); databaseResult->nextRow()) times->push_back(mysqlToTimeT(databaseResult->getStringField(0)) + MARKET_HOURS_OPEN); return StartTimeList(times); } // Increment once per call. And a timer with this name for the entire call. static const std::string s_threadSafeGet_n = "DailyCandles threadSafeGet n"; // If we need to get 1 or more candles from the database then we increment this // by 1. static const std::string s_threadSafeGet_SplitList = "DailyCandles threadSafeGet SplitList"; // Increment this by 1 for every candle we get from the database. static const std::string s_threadSafeGet_database = "DailyCandles threadSafeGet database"; // Increment this by 1 for every candle we get from the database cache. static const std::string s_threadSafeGet_cache = "DailyCandles threadSafeGet cache"; // Increment this by 1 for every candle that we return in the destination. // s_threadSafeGet_database and s_threadSafeGet_cache refer to the number of // 1 day candles we start from. s_threadSafeGet_out refers to the number of // daily, weekly, or monthly candles that we create. The latter might be a // lot smaller than the former if we build a lot of montly candles. static const std::string s_threadSafeGet_out = "DailyCandles threadSafeGet out"; // Grab n candles at once. void DailyCandles::threadSafeGet(AllRowTimes const &requests, SingleCandle::ByStartTime &destination) { // We are no longer supporting CandleClient::instance(). It would be // possible to make it work, but I don't think anyone's using it and it // would take some effort to test. ThreadMonitor::SetState tm(s_threadSafeGet_n); tm.increment(s_threadSafeGet_n); destination.clear(); if (requests.empty()) { // This looks suspicious, but the warning is causing more problems than it // solving //sendToLogFile(TclList()< requestsMap; for (RowTimes const × : requests) requestsMap[times.start] = times.end; const time_t requestsBegin = requestsMap.begin()->first; const time_t requestsEnd = requestsMap.rbegin()->second; if (requestsEnd <= requestsBegin) { // It's tempting to fail an assertion here! sendToLogFile(TclList()<= it->second) // it pointed to a likely request, but the request ended before the // given startTime. return; destination[it->first] += candle; }; time_t dataFeedCandleStartTime; SingleCandle currentState; { auto fromRealtime = _fromRealtime.readLock(); dataFeedCandleStartTime = fromRealtime->candleStartTime; currentState = fromRealtime->currentState; } if (!dataFeedCandleStartTime) // Use a very large number, instead of 0, to say no data. maximize(dataFeedCandleStartTime); CachedCandles cachedCandles = _databaseCache.get(); time_t cacheMinTime = dataFeedCandleStartTime; if (!cachedCandles->empty()) // Go to the database if a request comes before this time. cacheMinTime = std::min(cacheMinTime, cachedCandles->begin()->startTime); // We checked what was in memory first, so we knew what we'd need to get from // the database. If there's an option we always prefer to get as much data // as possible from memory. Then we read the daily candles in order, oldest // to newest, meaning that we process the results from the database first. if (requestsBegin < cacheMinTime) { tm.increment(s_threadSafeGet_SplitList); const SplitList splitList(_symbol); const std::string sql = "SELECT date, open, high, low, close, volume FROM bars_d " "WHERE date >= '" + timeTToMysql(requestsBegin - MARKET_HOURS_OPEN) + "' AND date < '" + timeTToMysql(std::min(requestsEnd, cacheMinTime) - MARKET_HOURS_OPEN) + "' AND symbol = '" + mysqlEscapeString(_symbol) + "' ORDER BY date ASC"; //TclList msg; //msg<tryQueryUntilSuccess(sql, "DailyCandles::threadSafeGet"); result->rowIsValid(); result->nextRow()) { candleCount++; const time_t startTime = mysqlToTimeT(result->getStringField(0)) + MARKET_HOURS_OPEN; SingleCandle candle; candle.open = result->getDoubleField(1, 0.0); candle.high = result->getDoubleField(2, 0.0); candle.low = result->getDoubleField(3, 0.0); candle.close = result->getDoubleField(4, 0.0); candle.volume = result->getIntegerField(5, 0); splitList.fixCandle(startTime, candle); //msg<begin(), cachedCandles->end(), std::min(requestsBegin, dataFeedCandleStartTime), SingleCandleWithTime::Comp()), end = std::lower_bound(it, cachedCandles->end(), std::min(requestsEnd, dataFeedCandleStartTime), SingleCandleWithTime::Comp()); it != end; it++) { candleCount++; add(it->startTime, it->candle); } tm.increment(s_threadSafeGet_cache, candleCount); if (!currentState.empty()) add(dataFeedCandleStartTime, currentState); tm.increment(s_threadSafeGet_out, destination.size()); //sendToLogFile(TclList()<(symbol); return Manager::getInstance().find< DailyCandles >(key, [&]() { return new DailyCandles(key, symbol); }); }