#include #include "../shared/MarketHours.h" #include "OneMinuteCandles.h" #include "DailyCandles.h" #include "CandleDataNode.h" ///////////////////////////////////////////////////////////////////// // CandleDataNode::Manager ///////////////////////////////////////////////////////////////////// CandleDataNode::Manager *CandleDataNode::Manager::_instance = NULL; void CandleDataNode::Manager::initialize(DataNodeManager &dataNodeManager) { Manager *instance = new Manager(dataNodeManager); assertTrue(__sync_bool_compare_and_swap(&_instance, NULL, instance)); } void CandleDataNode::Manager::decrementReferenceCount (CandleDataNode *candleDataNode) { bool destroyThis; { auto nodes = _nodes.lock(); candleDataNode->_referenceCount--; destroyThis = candleDataNode->_referenceCount == 0; if (destroyThis) assertTrue(nodes->erase(candleDataNode->_key)); } if (destroyThis) getDataNodeManager().addToQueue([candleDataNode]() { // For simplicity we always do the delete in the data node thread. delete candleDataNode; }); } ///////////////////////////////////////////////////////////////////// // CandleDataNode::CachedCandlesHolder::NeedsLock ///////////////////////////////////////////////////////////////////// CandleDataNode::CachedCandlesHolder::NeedsLock::NeedsLock() : shutDown(false), newDataRequestPending(false), cacheExpires(0), candles(NULL) { } CandleDataNode::CachedCandlesHolder::NeedsLock::~NeedsLock() { assert(shutDown); } ///////////////////////////////////////////////////////////////////// // CandleDataNode::CachedCandlesHolder ///////////////////////////////////////////////////////////////////// CandleDataNode::CachedCandlesHolder::CachedCandlesHolder (bool enabled, std::function< CachedCandles() > const &getData) : _body(enabled?(new Body(getData)):NULL) { } CandleDataNode::CachedCandlesHolder::~CachedCandlesHolder() { if (_body) { _body->needsLock.lock()->shutDown = true; const auto toDelete = _body; // A lambda can't directly capture _body. Manager::getInstance().getThread().addLambdaToQueue([toDelete]() { delete toDelete; }); } } void CandleDataNode::CachedCandlesHolder::clearIfOld() { auto body = _body->needsLock.lock(); if (body->cacheExpires < time(NULL)) body->candles.clear(); } const CandleDataNode::CachedCandles CandleDataNode::CachedCandlesHolder::EMPTY_LIST = new std::vector< SingleCandleWithTime >; // If the cached items have been around this many seconds or more, discard // those items immediately. static const time_t CACHE_VALID_TIME = 6 * MARKET_HOURS_HOUR; // If you are trying to use the cache, and it has not yet expired, but it will // expire in this many seconds or fewer, go ahead and ask for the cache to be // refreshed now. We're just queuing up the request, which is very fast, then // we immediately return the data that's already sitting in the cache. We // don't actually wait until the new candles are ready. static const time_t CACHE_REREQUEST_TIME = MARKET_HOURS_HOUR; std::string CandleDataNode::CachedCandlesHolder::debugDump() const { TclList result; result<<_body; if (_body) { auto body = _body->needsLock.lock(); result<<"shutDown"<shutDown <<"newDataRequestPending"<newDataRequestPending <<"cacheExpires"<cacheExpires) <<"candles"; CachedCandles const &candles = body->candles; if (!candles) result<<"NULL"; else if (candles == EMPTY_LIST) result<<"EMPTY_LIST"; else { TclList candlesDump; candlesDump<size(); if (candles->size() > 0) { candlesDump<begin()->startTime); if (candles->size() > 1) candlesDump<rbegin()->startTime); } result<needsLock.lock(); assert(!body->shutDown); if (body->cacheExpires < now) body->candles.clear(); if ((!body->newDataRequestPending) && ((!body->candles) || (body->cacheExpires < now + CACHE_REREQUEST_TIME))) { body->newDataRequestPending = true; const auto b = _body; // A lambda can't directly capture _body. Manager::getInstance().getThread().addLambdaToQueue([b]() { if (b->needsLock.lock()->shutDown) return; CachedCandles newCandles = b->getData(); auto body = b->needsLock.lock(); if (body->shutDown) return; assert(body->newDataRequestPending); body->newDataRequestPending = false; body->cacheExpires = time(NULL) + CACHE_VALID_TIME; body->candles = newCandles; }); } return body->candles?body->candles:EMPTY_LIST; } ///////////////////////////////////////////////////////////////////// // CandleDataNode::StartTimeList ///////////////////////////////////////////////////////////////////// void CandleDataNode::StartTimeList::clear() { _firstEnd = _end = 0; _oldest = 0; _firstContainer.clear(); _secondContainer.clear(); } CandleDataNode::StartTimeList::StartTimeList (TSRefCount< std::vector< time_t > > const ×) : _firstContainer(times) { if (!times) { // No data in this list and no reason to ask for the next list. _end = _firstEnd = 0; _oldest = 0; } else { _first = times->begin(); _end = _firstEnd = times->size(); if (_end) _oldest = *(_first + (_end - 1)); // The last item in the input. else _oldest = 0; // No more data, don't try again. } } CandleDataNode::StartTimeList::StartTimeList (TSRefCount< std::vector< time_t > > const &realTime, CachedCandles const &cached) : _firstContainer(realTime), _secondContainer(cached) { assert(!realTime->empty()); _first = _firstContainer->begin(); _end = _firstEnd = _firstContainer->size(); _oldest = *(_first + (_end - 1)); // The last item in the input. if (cached) { auto end = std::lower_bound(_secondContainer->begin(), _secondContainer->end(), _oldest, SingleCandleWithTime::Comp()); int size = end - _secondContainer->begin(); if (size) { _end += size; _second = _secondContainer->begin() + (size - 1); _oldest = _secondContainer->begin()->startTime; } } } CandleDataNode::StartTimeList::StartTimeList(time_t startBefore, CachedCandles const ×) : _firstEnd(0), _secondContainer(times), _end(0), _oldest(startBefore) { if (!times) return; // No data. const auto t = &*times; auto end = std::lower_bound(t->begin(), t->end(), startBefore, SingleCandleWithTime::Comp()); auto size = end - t->begin(); _end = size; if (!size) return; // All data filtered out. // *_second is the first piece of valid data. We walk backwards from there. _second = t->begin() + (size - 1); } ///////////////////////////////////////////////////////////////////// // CandleDataNode ///////////////////////////////////////////////////////////////////// // This just calls the other version of threadSafeGet() as a simple wrapper. // The algorithms for getting the data from three sources and merging them // together is getting too complicated to write AND TEST twice. // // Originally we only had this version of threadSafeGet() and you'd have to // call it over and over. We added the other version for performance. This // version is still called in a few cases, but not many. SingleCandle CandleDataNode::threadSafeGet(time_t start, time_t end) { ThreadMonitor::find().increment("CandleDataNode::threadSafeGet"); const AllRowTimes requests = { {start, end} }; SingleCandle::ByStartTime destination; threadSafeGet(requests, destination); return destination[start]; } TclList CandleDataNode::debugDump() const { TclList broadcastRequests; for (BroadcastRequest const &request : _broadcastRequests) broadcastRequests<release(); } CandleDataNode *CandleDataNode::find(std::string const &symbol, bool intraday) { if (intraday) return OneMinuteCandles::find(symbol); else return DailyCandles::find(symbol); } void CandleDataNode::decrementReferenceCount() { getManager().decrementReferenceCount(this); }