#include #include #include #include "../shared/ReplyToClient.h" #include "../shared/SelectableRequestQueue.h" #include "../shared/CommandDispatcher.h" #include "../shared/LogFile.h" #include "../shared/SimpleLogFile.h" #include "WorkerThread.h" #include "UserInfo.h" #include "TopListConfig.h" #include "TopListServerConnection.h" #include "TopList.h" ///////////////////////////////////////////////////////////////////// // TopListQueueItem // // This thread can handle multiple types of requests. These all // have similar implementations. More to the point, they all send // similar queries to the database, so they can all wait in the same // queue. I didn't want to create a bunch of threads and database // connections when I added the new request types. // // The requests all look similar in implementation. They are all // based on the idea of a top list. They look significantly // different to the client software and the end user. // // TopListQueueItem is mostly an interface, which the thread can // handle without caring about the details. However, this class // implements some common features just because it was simpler // that way and I didn't want to duplicate a lot of code. // // Note: When I first created TopListQueueItem, TopListRequest // was the only subclass. I created TopListQueueItem by breaking // TopListRequest into two classes. SymbolListCount is coming // very soon. That class is the main reason I created the // TopListQueueItem class. Whenever we get to it, it seems like // the heat map will require something similar to SymbolListCount. ///////////////////////////////////////////////////////////////////// class TopListQueueItem : public WorkerThreadRequest { protected: // Housekeeping. Who made this request? Who receives the reply? const ExternalRequest::MessageId _controlMessageId; const UserId _userId; // Original request from the client. const std::string _config; // Current state. bool _done; /* This request should be deleted the next time it's in the main * thread. This can only be set from the worker thread. If the * main thread wants to delete the request, and the request is * in the worker, the main thread records that elsewhere. */ TimeVal _nextTime; // When should we be called again. // Flag to determine if client can handle display only fields bool _allowNonFilterColumns; TopListQueueItem(ExternalRequest *request); public: virtual void doWork(WorkerThread *resources) =0; bool isDone() { return _done; } TimeVal const &getNextTime() { return _nextTime; } virtual void setDataMessageId(ExternalRequest::MessageId messageId) { } }; TopListQueueItem::TopListQueueItem(ExternalRequest *request) : WorkerThreadRequest(request->getSocketInfo()), _controlMessageId(request->getResponseMessageId()), _userId(userInfoGetInfo(getSocketInfo()).userId), _config(request->getProperty("long_form")), _done(false), _nextTime(true), _allowNonFilterColumns(request->getProperty("non_filter_columns") == "1") { _nextTime.addMicroseconds(1000); } ///////////////////////////////////////////////////////////////////// // TopListAccumulator // // This is responsible for the X-axis on the histograms. It finds // the min and max, of course. But it also decides on things like // a log vs. linear scale, and it rounds (in the appropriate // direction) to avoid crazy decimals. ///////////////////////////////////////////////////////////////////// class TopListAccumulator { private: static const int desiredOutputCount = 40; std::multiset< double > _values; int _totalCount; ColumnListConfig::Mapping const *_names; double _precision; // i.e. 0.01 if we want to say 12.34 but not 12.341 bool _isPercent; void computeLinearBreaks(double lowIndex, double highIndex, std::set< double > &breaks) const { assert(lowIndex < highIndex); // Notice the fencepost problem. If we want 3 bars, we need 4 breaks. // one break for the lowest value, and one for the highest value. const double perTick = (highIndex - lowIndex) / desiredOutputCount * _precision; //std::cout<<"low="<<(lowIndex*_precision)<<", high="<<(highIndex*_precision)<<", lowIndex="< value) value = ceil(high / _precision) * _precision; } else if (value == 100.0) // This is a nasty trick. There is often a lot of stuff at exactly // 0.0 and exactly 100.0. Most other values are typically not on // an exact boundary. We want 0.0 to be part ofthe 0-5 candle, and // we want 100.0 to be part of the 95-100 candle, not the 100-105 // candle. Normally if a value is right on a boundary, we put that // in the candle on the right. Here we're sliding one boundary over // just enough that 100.0 will be in the previous candle. But // hopefully that will get rounded off to 100 before it is displayed // to the user. // // Of course, this would fail if we actually were dealing with // discrete values. Then we'd want 21 candles, rather than 20, // representing 0-100. What if the nubmer of 0.0's is significant? // That seems reasonable for something like range for the last // 1 minute. In that case the GUI would ignore all of the "between" // values. And we're labeling the 100.0's as between values. value = 100.0 * (1.0 + DBL_EPSILON); breaks.insert(value); } } /* This was an interesting attempt. We tried to mix a linear scale in * the middle with a log scale on the ends. It didn't look good. { // Reserve half of the space for the part between 0 and 100, and make // that linear. The extremes are added as an exponential. But I // look at the difference between the points and 50, not 0. It // should all be semetric about 50. const double drawLowIndex = std::min(lowIndex, 0.0); const double drawHighIndex = std::max(highIndex, 100.0 / _precision); const double index50Percent = 50.0 / _precision; const double leftWeight = log(index50Percent - drawLowIndex) - log(index50Percent); const double rightWeight = log(drawHighIndex - index50Percent) - log(index50Percent); assert((leftWeight >= 0) && (rightWeight >= 0)); for (int i = 0; i < 100; i += 5) breaks.insert(i); const double leftCount = round(20.0 * leftWeight / (leftWeight + rightWeight)); const double rightCount = 20.0 - leftCount; if (leftCount > 1) { const double perTick = leftWeight / leftCount; for (int i = 1; i < leftCount; i++) { const double logIndex = log(index50Percent) + perTick * i; const double index = index50Percent - exp(logIndex); const double value = round(index * _precision); breaks.insert(value); } } if (rightCount > 1) { const double perTick = rightWeight / rightCount; for (int i = 1; i < rightCount; i++) { const double logIndex = log(index50Percent) + perTick * i; const double index = index50Percent + exp(logIndex); const double value = round(index * _precision); breaks.insert(value); } } } */ } void computeLogBreaks(double lowIndex, double highIndex, std::set< double > &breaks) const { //std::cout<<"lowIndex="<= highNegativeIndex) // We expect to exit this loop right after adding // highNegativeIndex to the set. We say >= rather than // == in case of round off error. (In the worst case we'd // add one extra value.) The test (i <= desiredOutputCount) // is only to avoid an infinite loop in case something crazy // happens, like a NAN or INF or something else that causes // this if statement not to work as expected. (I can't think // of a specific case, but I can imagine strange cases.) break; } } if (hasPositive) { //std::cout<<"lowPositiveIndex="< const &breaks) const { int result = 0; std::multiset< double >::const_iterator it = _values.begin(); for (std::set< double >::const_iterator breaksIt = breaks.begin(); breaksIt != breaks.end(); breaksIt++) { const double value = *breaksIt; int count = 0; while ((it != _values.end()) && (*it <= value)) { count++; it++; } // We are computing sum of the square of the size of each bar. // The lowest possible value would be if all of the bars are the // the same. The highest possible value would be if all the bars // but one were 0. result += count*count; } return result; } public: TopListAccumulator(ColumnListConfig::Mapping const *names, AllConfigInfo::PairedFilterInfo const *filterInfo) : _totalCount(0), _names(names), _precision(1.0), _isPercent(false) { if (filterInfo && (filterInfo->format.size() == 1)) { const char ch = filterInfo->format[0]; if (ch == 'p') // Price. For simplicity I'm calling these all 2 digits. Really // it's more complicated that that. _precision = 0.01; else if ((ch >= '0') && (ch <= '9')) _precision = pow(0.1, ch - '0'); if (filterInfo->baseName == "EarningD") // 0.25 actually works just fine in the logic below. It wasn't the // best for this filter, but we can consider it for other cases. //_precision = 0.25; // I've been trying to avoid special logic for individual filters, // but this helped a lot. We normally want to display 1.25 or // something like that, because we are precise to 1/4. But the // histogram looked much better with precision set to 1. Mostly // that's because we try to make about 40 bars in the hisogram, and // there are about 40 unique values when you round this filter to the // day. _precision = 1.0; } if (filterInfo) { const std::string flip = filterInfo->flip; if (flip.size() > 0) if (flip[0] == '%') _isPercent = true; } } void addRow(PropertyList const &row) { _totalCount++; const double value = (!_names)? 0.0: strtodDefault(getPropertyDefault(row, _names->altName), std::numeric_limits< double >::infinity()); if (finite(value)) _values.insert(value); } void getBreaks(std::set< double > &breaks) const { if (_values.empty()) return; const double lowIndex = floor(*_values.begin() / _precision); const double highIndex = ceil(*_values.rbegin() / _precision); if (_isPercent) { computePercentBreaks(lowIndex, highIndex, breaks); } else if (highIndex - lowIndex + 1 <= desiredOutputCount) { for (double index = lowIndex; index <= highIndex; index++) breaks.insert(index * _precision); } else { computeLogBreaks(lowIndex, highIndex, breaks); if (breaks.size() == 0) // computeLogBreaks reported an error. Fall back to something // simpler. computeLinearBreaks(lowIndex, highIndex, breaks); else { // Try log and linear. Return whichever one is smoother. std::set< double > altBreaks; computeLinearBreaks(lowIndex, highIndex, altBreaks); if (bumpiness(breaks) > bumpiness(altBreaks)) breaks.swap(altBreaks); } } } std::multiset< double > const &getValues() const { return _values; } std::string const *getInternalCode() const { return _names?&_names->internalCode:NULL; } int getTotalCount() const { return _totalCount; } }; ///////////////////////////////////////////////////////////////////// // MarketSummaryRequest // // This was inspired by MatchingSymbolCountRequest. In fact, early // versions of the client used MatchingSymbolCountRequest. But that // was not powerful enough. // // This looks through a bunch of symbols that match your request. // It groups them based on a specified column. It then provides a // summary of each group. The summary could be the number of symbols // in the group, or the sum of all values of a specific field (i.e. // the 5 minute volume) for each symbol in the group. // // The algorithm for breaking up the data into groups is very similar // to what we use in MatchingSymbolCountRequest. In fact, the exact // same code comes up with the values to put on the axis. However, // MatchingSymbolCountRequest tried to be very careful about symbols // exactly matching an axis point, vs symbols between two points. // This was required to make the filters somewhat accurate. This // class consideres the axis point to be the center of the group. // Each stock is associated with the closest axis point. // // Possible extensions: // o Group in two dimesions. Two seperate fields are broken into // pieces. So one group might be stocks priced between $1 and $2 // with relative volume between 2.0 and 2.5. // o Multiple outputs per group. For example the height and the // color of the bar. // o More complicated formulas. Like the average volume, rather than // the total volume. See the Quantalytics charts for more ideas. ///////////////////////////////////////////////////////////////////// class MarketSummaryRequest : public TopListQueueItem { private: bool _initialized; bool _responseSent; TopListConfig _parsed; static double nearestAxisPoint(std::set< double > const &axis, double point); void initialize(WorkerThread *resources); void getData(WorkerThread *resources); public: void doWork(WorkerThread *resources); MarketSummaryRequest(ExternalRequest *request) : TopListQueueItem(request), _initialized(false), _responseSent(false) { } }; void MarketSummaryRequest::initialize(WorkerThread *resources) { assert(!_initialized); _parsed.load(_config, _userId, *resources->getReadOnlyDatabase(), true); _parsed.removeIllegalData(_userId, *resources->getReadOnlyDatabase()); _nextTime.currentTime(); _nextTime.addMicroseconds(1000); _initialized = true; setDayNumber(_parsed.dayNumber(*resources->getReadOnlyDatabase())); } double MarketSummaryRequest::nearestAxisPoint(std::set< double > const &axis, double point) { const std::set< double >::const_iterator keyLowerBound = axis.lower_bound(point); if (keyLowerBound == axis.end()) // This point is lower than the lowest axis point. Round up to the lowest // point. return *axis.begin(); if (*keyLowerBound == point) // We found an exact match. Use it! return point; std::set< double >::const_iterator nextKey = keyLowerBound; nextKey++; if (nextKey == axis.end()) // This point is higher than the highest axis point. Round down to the // highest point. return *axis.rbegin(); const double middle = ((*keyLowerBound) + (*nextKey)) / 2.0; // Use a simple linear comparison to say which key is closer. That might // not be perfect since we sometimes use a log scale. if (point < middle) return *keyLowerBound; else return *nextKey; } // Grab the i-th column from the list. If there is no i-th column, return null. // If the i-th column is not in the filterList, return null. Presumably the filter // list will be filled with filters but not columns. So D_Symbol will return // null. static ColumnListConfig::Mapping const * getFilterColumn(int i, ColumnListConfig::Mappings const &columns, PairedFilterList const &filterList) { if (i < 0) return NULL; if (i >= (int)columns.size()) return NULL; ColumnListConfig::Mapping const *const result = &columns[i]; AllConfigInfo::PairedFilterInfo const *const info = filterList.get(result->internalCode); if (!info) return NULL; return result; } void MarketSummaryRequest::getData(WorkerThread *resources) { assert(_initialized && !_responseSent); XmlNode response; TopListConfig::AllData allData; _parsed.getData(allData, _userId, false /* useEasternTime */, *resources->getReadOnlyDatabase(), true /* showAllData */); PairedFilterList filterList(_userId, *resources->getReadOnlyDatabase(), true /* topList */, true /* onlyFilters */); ColumnListConfig::Mappings const &columns = _parsed.getColumnMappings(); /* TclList msg; msg<debugDump(); sendToLogFile(msg); */ // This next part should be more configurable. The config string gives // us a universe of columns to look at. But other instructions should tell // us what to do with them. For now: // o If we have no columns, we just count the number of matching symbols. // o If we have at least one column the first column is the X axis. // o If the first column is set to a non-filter column, that's a placeholder. // That means that the X axis will have only one point, as if the field were // empty. But the second column can still be present to allow us to // act in the normal way. // o The second column is the data to sum and return. If there is only // one column, then we count the instances of that column and return // that. With no columns, we just count the number of matching rows. // o If we have a thrid column, we do a weighted average for each group, // and the third column is the weight. In this case the second column // is averaged rather than summed. // // We could have a grid. We hope to have a grid at some point. Then the // input could request one or two axes. We might want to output two or // more pieces of data for each point, such as the height and color of the // line. And we might have more complicated data, like an average rather // than a sum. ColumnListConfig::Mapping const *const axisColumn = getFilterColumn(0, columns, filterList); TopListAccumulator axis(axisColumn, axisColumn? filterList.get(axisColumn->internalCode): NULL); ColumnListConfig::Mapping const *const dataColumn = getFilterColumn(1, columns, filterList); ColumnListConfig::Mapping const *const weightColumn = getFilterColumn(2, columns, filterList); /* msg.clear(); msg<debugDump(); msg<<"dataColumn"; if (!dataColumn) msg<<"NULL"; else msg<debugDump(); msg<<"weightColumn"; if (!weightColumn) msg<<"NULL"; else msg<debugDump(); sendToLogFile(msg); */ // This seems to be completely redundant! We did exactly this a few lines // above. No one has touched allData since then, and none of the other // inputs have changed either. So we could remove either of these. // TODO: remove one of these. _parsed.getData(allData, _userId, false /* useEasternTime */, *resources->getReadOnlyDatabase(), true /* showAllData */); for (TopListConfig::Rows::const_iterator rowIt = allData.rows.begin(); rowIt != allData.rows.end(); rowIt++) axis.addRow(*rowIt); std::set< double > breaks; axis.getBreaks(breaks); if (breaks.empty()) { addToOutputQueue(getSocketInfo(), response.asString(), _controlMessageId); _responseSent = true; _done = true; return; } // Initialize every field to 0 so the client can see that the field exists, // and leave room for it even if it's empty. std::map< double, double > toReport; for (std::set< double >::const_iterator it = breaks.begin(); it != breaks.end(); it++) toReport[*it] = 0.0; std::map< double, double > divisor; for (TopListConfig::Rows::const_iterator rowIt = allData.rows.begin(); rowIt != allData.rows.end(); rowIt++) { const double baseKeyValue = axisColumn? strtodDefault(getPropertyDefault(*rowIt, axisColumn->altName), std::numeric_limits< double >::infinity()): 0.0; if (finite(baseKeyValue)) { const double key = nearestAxisPoint(breaks, baseKeyValue); if (!dataColumn) // Just count the number of rows. toReport[key]++; else { const double possibleValue = strtodDefault(getPropertyDefault(*rowIt, dataColumn->altName), std::numeric_limits< double >::infinity()); if (finite(possibleValue)) { if (!weightColumn) // Try to sum the values. toReport[key] += possibleValue; else { // Do a weighted average. const double possibleWeight = strtodDefault(getPropertyDefault(*rowIt, weightColumn->altName), std::numeric_limits< double >::infinity()); if (finite(possibleWeight)) { toReport[key] += possibleValue * possibleWeight; divisor[key] += possibleWeight; TclList msg; msg<internalCode; for (std::map< double, double >::const_iterator it = toReport.begin(); it != toReport.end(); it++) { XmlNode &tickMark = report[-1]; // This seems a little too simple. If we have more than one input or // more than one output, this won't extend well. tickMark.properties["X"] = ntoa(it->first); if (!weightColumn) // Copy the sum to the client. tickMark.properties["Y"] = ntoa(it->second); else { // Compute the average const double d = divisor[it->first]; if (d != 0.0) tickMark.properties["Y"] = ntoa(it->second / d); } tickMark.name = "TICK_MARK"; } addToOutputQueue(getSocketInfo(), response.asString(), _controlMessageId); _responseSent = true; _done = true; } void MarketSummaryRequest::doWork(WorkerThread *resources) { if (_initialized) getData(resources); else initialize(resources); } ///////////////////////////////////////////////////////////////////// // MatchingSymbolCountRequest // // This is mostly aimed at the config window. People have a filter // and they don't know what's a good value to go in it. So we create // a histogram showing how many symbols correspond to different // possible filter values. // // The implementation is based on a top list. They seem similar. // Look for stocks matching a filter, regardless of any alerts. The // timing aspects of the top list are useful, too. Most of the time // we expect people to request realtime data during market hours, but // to look at the most recent close at other times. (We don't enforce // that. A client might allow the user to request data from a // specific time.) // // We request data for all symbols, not just the top N symbols. In // fact, we don't care about the sort order, either. After getting // the list, we build the histogram, and send that back to the client. // // The user must specify a list of columns. (Probably only 1 column, // but we allow a list.) This is what we sum up. // // Does this bring up permissions issues? A user could specify a // symbol list with just one item. The histogram would make it clear // exactly the current value for that item. It's tempting to force // all of these requests to use 15 minute delayed data. That would // completely avoid that issue. And it might help with the caching. // It's also tempting to go the other way and always use live data. // I think that's slightly faster. That assumes we can get around // these permissions issues and say we're only presenting an // aggregate of a lot of symbols. // // It's tempting to try to cache some results. We expect the client // to send a pair of requests at the same time. One is for the set // of all symbols, and the other matches the user's current config // window. The former should be easy to spot, because the client // will request a default config string (i.e. ""). The trickier // issue is detecting user specific filters. There may also be // permissions issues, like delayed vs. realtime and/or missing // exchanges. For now it's easy to skip this step. At least wait // until we get a better look at the client. ///////////////////////////////////////////////////////////////////// class MatchingSymbolCountRequest : public TopListQueueItem { private: bool _initialized; bool _responseSent; TopListConfig _parsed; void initialize(WorkerThread *resources); void getData(WorkerThread *resources); class Accumulator { private: TopListAccumulator _breaks; public: Accumulator(ColumnListConfig::Mapping const *names, AllConfigInfo::PairedFilterInfo const *filterInfo) : _breaks(names, filterInfo) { } void addRow(PropertyList const &row) { _breaks.addRow(row); } void addToReport(XmlNode &parent) const { std::multiset< double > const &values = _breaks.getValues(); if (values.empty()) // This could happen for a number of reasons. Maybe a filter is // crap. Maybe the other constraints are too tight and we don't look // at any rows. Maybe this is a special row, like the symbol. For // simplicity we just don't report anything. return; XmlNode &report = parent[-1]; if (_breaks.getInternalCode()) report.properties["CODE"] = *_breaks.getInternalCode(); report.properties["TOTAL"] = ntoa(_breaks.getTotalCount()); std::set< double > breaks; _breaks.getBreaks(breaks); std::multiset< double >::const_iterator it = values.begin(); int accumulator = 0; for (std::set< double >::const_iterator breaksIt = breaks.begin(); breaksIt != breaks.end(); breaksIt++) { const double value = *breaksIt; int countLessThan = 0; int countEqual = 0; while ((it != values.end()) && (*it <= value)) { if (*it == value) countEqual++; else countLessThan++; it++; } XmlNode &tickMark = report[-1]; tickMark.properties["VALUE"] = ntoa(value); // Report the total number of items that are less than this value. accumulator += countLessThan; tickMark.properties["LESS"] = ntoa(accumulator); if (countEqual) { // Report the total number of items that are less than or equal // to this value. If this is missing, the client should copy // the value from LESS. accumulator += countEqual; tickMark.properties["NOT_MORE"] = ntoa(accumulator); } } } }; typedef std::vector< Accumulator > Accumulators; public: void doWork(WorkerThread *resources); MatchingSymbolCountRequest(ExternalRequest *request) : TopListQueueItem(request), _initialized(false), _responseSent(false) { } }; void MatchingSymbolCountRequest::initialize(WorkerThread *resources) { assert(!_initialized); _parsed.load(_config, _userId, *resources->getReadOnlyDatabase(), false); _parsed.removeIllegalData(_userId, *resources->getReadOnlyDatabase()); //std::cout<<"_config"<<_config<<" ==> "<<_parsed.save()<getReadOnlyDatabase())); } void MatchingSymbolCountRequest::getData(WorkerThread *resources) { assert(_initialized && !_responseSent); TopListConfig::AllData allData; _parsed.getData(allData, _userId, false /* useEasternTime */, *resources->getReadOnlyDatabase(), true /* showAllData */); PairedFilterList filterList(_userId, *resources->getReadOnlyDatabase(), true /* topList */, true /* onlyFilters */); // This next line might be helpful. This is how you find out if price data // in a row should be reported with 2 digits or 4 after the decimal point. //if (source->getBooleanField("four_digits")) ColumnListConfig::Mappings const &mappings = _parsed.getColumnMappings(); Accumulators accumulators; for (ColumnListConfig::Mappings::const_iterator it = mappings.begin(); it != mappings.end(); it++) accumulators.push_back(Accumulator(&*it, filterList.get(it->internalCode))); for (TopListConfig::Rows::const_iterator rowIt = allData.rows.begin(); rowIt != allData.rows.end(); rowIt++) for (Accumulators::iterator aIt = accumulators.begin(); aIt != accumulators.end(); aIt++) aIt->addRow(*rowIt); XmlNode message; //message.properties["MAPPING_COUNT"] = ntoa(mappings.size()); //message.properties["ROW_COUNT"] = ntoa(allData.rows.size()); // Make a sub-node just for future flexibility. We can always add new nodes // with different types of data and older clients will ignore them. XmlNode &filters = message["FILTERS"]; // Add one sub node per requested filter. The initial use is expected to // have only one filter. But it seems easier to do it this way, since we // are reusing the existing column list mechanism, and that allows any // number of columns. We do not report anything for a filter if there is // no interesting data to report. I.e. all values are null. We do not // promise to send the nodes in any particular order. // Note: In testing it's been very helpful to send multiple requests at // once. for(Accumulators::const_iterator it = accumulators.begin(); it !=accumulators.end(); it++) it->addToReport(filters); addToOutputQueue(getSocketInfo(), message.asString(), _controlMessageId); _responseSent = true; _done = true; } void MatchingSymbolCountRequest::doWork(WorkerThread *resources) { // This might be overkill in this class. We *expect* the client to always // ask for realtime data. However, it's simpler for us if we don't try // to inspect or change the time in the request. And it's more powerful // for the client. A future client might give the end user an option // to select a different time. if (_initialized) getData(resources); else initialize(resources); } ///////////////////////////////////////////////////////////////////// // TopListRequest // // This class satisfies top list windows in the client. TopListQueueItem // can be used for other types of requests that also use a TopListConfig, // but these classes look significantly different to the client software. // // This class handles pretty much all of the work in this unit. There are // three basic tasks, all of which require the database. Some require a // specific database because they are history requests. Others don't care // which database, but we still use the same WorkerCluster because we've // already allocated it. Presumably relatively few requests will need this // so it would be a shame to waste it. These requests can all sit in the // same queue because the main thread does the same things with them. Either // it sends them to a worker thread, or it deletes them. That's all the main // thread knows. This object is the one that knows the different tasks. // These tasks are: // 1) Initial setup. This includes things like checking permissions in // the database to make sure we're not requesting illegal symbol lists. // More generally, converting the request from the client request into // an SQL request. This might or might not be done at the same time as the // next request, without a second trip to the worker thread. // 2) Live or delayed data. These will be available from any database // server. For simplicity we might assume the delay will be small so we // are always looking at today's data. It would be possible to treat // every delayed request like a historical request, for the most // flexibility, but that seems like a lot of effort for little payback. // 3) Historical data. This is similar to #2 but you expect to check the // shard list to decide which database server gets the request. ///////////////////////////////////////////////////////////////////// class TopListRequest : public TopListQueueItem { private: // Housekeeping. Who made this request? Who receives the reply? const bool _saveToMru; const std::string _windowId; ExternalRequest::MessageId _dataMessageId; // Current state. bool _initialized; // We've done the one time setup. TopListConfig _parsed; // This is loaded when _initialized is true. void initialize(WorkerThread *resources); void getData(WorkerThread *resources); public: void doWork(WorkerThread *resources); TopListRequest(ExternalRequest *request); void setDataMessageId(ExternalRequest::MessageId messageId) { _dataMessageId = messageId; } std::string const &getWindowId() const { return _windowId; } ~TopListRequest(); }; TopListRequest::TopListRequest(ExternalRequest *request) : TopListQueueItem(request), _saveToMru(request->getProperty("save_to_mru", "1") == "1"), _windowId(request->getProperty("window_id")), _initialized(false) { // A note about queues and timing. All database requests are put into the // same queue. This was based on the queue for realtime alerts, but in // this unit everything goes in that queue. Realtime alerts were much // more complicated. // // This simplifies things in part because of the priorities. If we want // something done (approximately) right now, we set the _nextTime to now. // If nothing else is ready, then the new item will be the first item in // the queue and it will be processed immediately. On the other hand, if // the queue is already backed up, we first work off the stuff that is // already late before we get to the new item. But with only one queue, // nothing will get lost. Eventually we will get to every item. // // We put in a brief pause here, for a new item, for a couple of reasons. // We don't want people to overwhealm us with new requests. Mostly we // want to delver data at a rate of about 1 update per 15 seconds. Also, // we want to give the client time to send the open channel request. // Ideally that would come first. But it makes things easier on the client // if we allow things just slightly out of order. // // Note that we don't wait forever. If the channel is not open by the time // we process this request for the first time, then we report an error. // We are considering two different types of problems on the client side. // 1) The client got disconnected, and is the in the process of // reconnecting. While reconnecting it sends all the requests for // specific top lists, and it sends the request for the channel. But // these are out of order. The brief pause mentioned above handles that // case. // 2) The client sent the channel request an instant before the data // request, but in between the two messages the client disconnected // and reconnected. So the channel request got lost forever, and the // client needs to send it again. // // Note: This setup with individual requests and a single channel for // results was based on history and oddsmaker, but changed to make things // easier for the client. This is the first server change since we started // the C# client. That uses threads in a diffent way than the Delphi client. // But we also reorganized the code so it was more modular. Different // pieces know less about each other. That's why the server side needed // to change, to give the client a little more leeway. // // Note: The previous note was based on the initial design of the client. // Eventually I realized it was better to make things more like history. I // left the server code alone. It is more flexible and that doesn't hurt. // The client is less sloppy, so it won't need this flexibility. } static const std::string emptyResponse = XmlNode().asString(); TopListRequest::~TopListRequest() { // We always respond to the client when we are done with the request. All // data comes through the unique data channel. If the client did not get // all the data it expected, when it gets this response, it should resend // the original request. addToOutputQueue(getSocketInfo(), emptyResponse, _controlMessageId); } void TopListRequest::initialize(WorkerThread *resources) { if (_dataMessageId.isEmpty()) { _done = true; return; } _parsed.load(_config, _userId, *resources->getReadOnlyDatabase(), _allowNonFilterColumns); // Like history and the OddsMaker we expect the client to iterate over all // the nodes. Most of the time there will only be one node per message, but // we reserve the right to send more. XmlNode message; XmlNode &group = message["TOPLIST"]; group.properties["TYPE"] = "info"; group.properties["WINDOW"] = _windowId; _parsed.getInitialDescription(group, _userId, *resources->getReadOnlyDatabase()); addToOutputQueue(getSocketInfo(), message.asString(), _dataMessageId); if (_saveToMru) _parsed.saveToMru(_userId, *resources->getMasterDatabase()); // As with the alerts, we always send the config string back to the user // before checking for illegal data. That way if there is something // that is temporarily disabled, the client will continue to ask for that, // and might eventually get it. This system is not perfect. Now that // users can have custom feilds, this doesn't completely make sense anymore. _parsed.removeIllegalData(_userId, *resources->getReadOnlyDatabase()); _nextTime.currentTime(); _nextTime.addMicroseconds(1000); _initialized = true; setDayNumber(_parsed.dayNumber(*resources->getReadOnlyDatabase())); } void TopListRequest::getData(WorkerThread *resources) { XmlNode message; XmlNode &group = message["TOPLIST"]; group.properties["TYPE"] = "data"; group.properties["WINDOW"] = _windowId; TopListConfig::AllData allData; const bool useEasternTime = userInfoUseEasternTime(getSocketInfo()); _parsed.getData(allData, _userId, useEasternTime, *resources->getReadOnlyDatabase()); for (TopListConfig::Rows::const_iterator it = allData.rows.begin(); it != allData.rows.end(); it++) { XmlNode &asXml = group[-1]; asXml.properties = *it; } if (_parsed.history()) group.properties["DONE"] = "1"; if (!allData.startTime.empty()) group.properties["START_TIME"] = allData.startTime; if (!allData.endTime.empty()) group.properties["END_TIME"] = allData.endTime; addToOutputQueue(getSocketInfo(), message.asString(), _dataMessageId); if (_parsed.history()) _done = true; else { _nextTime.currentTime(); _nextTime.addSeconds(15); } } static const std::string s_TopListGetData = "TopListGetData"; static const std::string s_TopListInitialize = "TopListInitialize"; void TopListRequest::doWork(WorkerThread *resources) { if (_initialized) { ThreadMonitor::SetState state(s_TopListGetData); ThreadMonitor::find().increment(s_TopListGetData); getData(resources); } else { ThreadMonitor::SetState state(s_TopListInitialize); ThreadMonitor::find().increment(s_TopListInitialize); initialize(resources); } } ///////////////////////////////////////////////////////////////////// // TopListQueueManager // // This is not strictly necessary, but it avoids some circular // references and generally makes things a little cleaner. ///////////////////////////////////////////////////////////////////// class TopListQueueManager { public: virtual void addToQueue(TopListQueueItem *) =0; virtual void cancel(TopListQueueItem *) =0; virtual ~TopListQueueManager() { } }; ///////////////////////////////////////////////////////////////////// // TopListClient // // Information about all the requests from a single socket. ///////////////////////////////////////////////////////////////////// class TopListClient { private: // This contains an entry for each window id which is active. // We delete the entry when a request gets canceled. Use the _manager // to determine if the request is owned by the main thread or by // the worker thread. If it is in the worker thread we can't manipulate // the object; all we can do is ask the worker thread to delete it. std::map _active; // Ideally this would be set in the constructor. But that's not possible // because of the way that we use this class in STL datastructures. TopListQueueManager *_manager; void clear(); public: ExternalRequest::MessageId messageId; TopListClient() : _manager(NULL) { } void newRequest(TopListRequest *request, TopListQueueManager *manager); bool cancelRequest(std::string const &id, TopListQueueManager *manager); ~TopListClient(); }; void TopListClient::newRequest(TopListRequest *request, TopListQueueManager *manager) { _manager = manager; TopListRequest *&p = _active[request->getWindowId()]; if (p) // This replaces an existing request. On the client side, requests ids are // never reused. That caused some confusion in the alerts code because the // client never knew for certain if a response came from the current // request or the previous one. However, the server doesn't really care. // It is happy to replace a request because it's easy for us to do and // that makes things flixible. We still call it a "window id" when we // should be calling it a "request id". That name stuck from the alerts // code. manager->cancel(p); p = request; manager->addToQueue(p); } bool TopListClient::cancelRequest(std::string const &id, TopListQueueManager *manager) { TopListRequest *p = getPropertyDefault(_active, id); if (!p) // The request was already deleted or perhaps it never existed. return false; manager->cancel(p); _active.erase(id); return true; } TopListClient::~TopListClient() { if (_manager) clear(); else assert(_active.empty()); } void TopListClient::clear() { for (std::map::iterator it = _active.begin(); it != _active.end(); it++) _manager->cancel(it->second); _active.clear(); } ///////////////////////////////////////////////////////////////////// // TopListThread // // ///////////////////////////////////////////////////////////////////// class TopListThread : private ThreadClass, TopListQueueManager { private: enum { mtOpenChannel, mtStart, mtStop, mtMarketSummaryRequest, mtSymbolCountRequest, mtWorkFinished, mtQuit }; TopListServerConnection *const _liveServerConnection; TopListServerConnection *const _delayedServerConnection; std::map< SocketInfo *, TopListClient > _clients; SelectableRequestQueue _incoming; WorkerCluster _workerCluster; typedef std::pair< TimeVal, TopListQueueItem * > TimerQueueItem; static TimerQueueItem makeTimerQueueItem(TopListQueueItem *); typedef std::set< TimerQueueItem > TimerQueue; TimerQueue _timerQueue; virtual void addToQueue(TopListQueueItem *); virtual void cancel(TopListQueueItem *); protected: void threadFunction(); public: TopListThread(); ~TopListThread(); }; TopListThread::TimerQueueItem TopListThread::makeTimerQueueItem(TopListQueueItem *request) { return TimerQueueItem(request->getNextTime(), request); } void TopListThread::addToQueue(TopListQueueItem *request) { _timerQueue.insert(makeTimerQueueItem(request)); } void TopListThread::cancel(TopListQueueItem *request) { // We crashed here with a core dump on 11/8/2016 around 12:30am. (And again // on dana Jun 30 2017, 11:03.) It appears // that the request had already been deleted before we got here, so accessing // it caused a segmentation violation. The socket associated with this // request was in the process of going down. In fact, the stack trace said // that we got here by handling the DeleteSocketThread::callbackId message. // // The stack trace looked like this: // DeleteSocketThread::callbackId -> // _clients.erase(socket) -> // TopListClient::~TopListClient() -> // TopListClient::clear() -> // TopListThread::cancel() -> // TopListThread::makeTimerQueueItem() // // makeTimerQueueItem() failed because it tried to deference the pointer. // We never should have called makeTimerQueueItem(). There’s a clear flaw in // our logic here. The comments below correctly say that we aren’t sure if // we own the request or not. So we check if the request is in the // _timerQueue to find out if we own the request. However, we can’t easily // check _timerQueue without trying to dereference the pointer, so it’s a // catch 22. // // There are no immediate plans to fix this bug. It’s in legacy code. And // I’ve never seen us crash here as long as I can remember. But it does bug // me. TODO // // Notice that new code typically uses ../shared/NewWorkerCluster.[Ch] which // does a much better job with threads. In particular the logic behind this // type of cleanup is much simpler. Other solutions to similar problems // usually involve ../shared/ThreadSafeRefCount.h, or a 64-bit one-up counter // that we use as a key into a lookup table. // // A lot of older code tried to use a pointer, without dereferencing it, as a // key in a lookup table. That would also be flawed. It is possible that // another thread deleted the object, and a third thread created a new object // with the same pointer. // Try to erase from the timer queue. if (_timerQueue.erase(makeTimerQueueItem(request))) // It was in our queue, so we are responsible for deleting it. delete request; else // It wasn't there. So erase it from the worker thread queue. return _workerCluster.remove(request); } TopListThread::TopListThread() : ThreadClass("TopList"), _liveServerConnection(new TopListServerConnection(false)), _delayedServerConnection(new TopListServerConnection(true)), _incoming("TopList"), _workerCluster("history_databases", "TopList", mtWorkFinished, &_incoming) { CommandDispatcher *dispatcher = CommandDispatcher::getInstance(); dispatcher->listenForCommand("top_list_listen", &_incoming, mtOpenChannel); dispatcher->listenForCommand("top_list_start", &_incoming, mtStart); dispatcher->listenForCommand("top_list_stop", &_incoming, mtStop); dispatcher->listenForCommand("symbol_count_request", &_incoming, mtSymbolCountRequest); dispatcher->listenForCommand("market_summary_request", &_incoming, mtMarketSummaryRequest); startThread(); } TopListThread::~TopListThread() { Request *r = new Request(NULL); r->callbackId = mtQuit; _incoming.newRequest(r); waitForThread(); } void TopListThread::threadFunction() { // This function was primarily based on OddsMaker::threadFunction() with // several modifications. Some parts came from UserRequestControl.C. ThreadMonitor::find().setState("working"); while (true) { while (Request *current = _incoming.getRequest()) { switch (current->callbackId) { case mtOpenChannel: { ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); SocketInfo *const socket = request->getSocketInfo(); const ExternalRequest::MessageId messageId = request->getResponseMessageId(); _clients[socket].messageId = messageId; const bool loggedIn = userInfoGetInfo(socket).userId; if (loggedIn) { TclList msg; msg<openChannel"; LogFile::primary().sendString(msg, socket); _liveServerConnection->openChannel(socket, messageId); } else { TclList msg; msg<openChannel"; LogFile::primary().sendString(msg, socket); _delayedServerConnection->openChannel(socket, messageId); } break; } case mtStart: { ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); SocketInfo *const socket = request->getSocketInfo(); const std::string collaborate = request->getProperty("long_form"); TopListTimeConfig timeConfig; timeConfig.quickLoad(collaborate); if (timeConfig.history()) { // Use the database and the old code to access history. _clients[socket].newRequest(new TopListRequest(request), this); } else { // Send the request to the new top list servers. const bool saveToMru = request->getProperty("save_to_mru", "1") == "1"; const std::string windowId = request->getProperty("window_id"); const bool loggedIn = userInfoGetInfo(socket).userId; if (loggedIn) { //TclList msg; //msg<newTopList"<newTopList"); _liveServerConnection ->newTopList(socket, collaborate, windowId, saveToMru); } else { ThreadMonitor::find().increment("_delayedServer->newTopList"); _delayedServerConnection ->newTopList(socket, collaborate, windowId, saveToMru); } } break; } case mtStop: { ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); SocketInfo *const socket = request->getSocketInfo(); const std::string windowId = request->getProperty("window_id"); bool somethingCanceled = false; if (TopListClient *client = getProperty(_clients, socket)) somethingCanceled = client->cancelRequest(windowId, this); if (!somethingCanceled) { // It wasn't something handled by this thread. So maybe it // belonged to the other thread. const bool loggedIn = userInfoGetInfo(socket).userId; if (loggedIn) { TclList msg; msg<cancelTopList"<cancelTopList(socket, windowId); } else _delayedServerConnection->cancelTopList(socket, windowId); } break; } case mtMarketSummaryRequest: { ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); addToQueue(new MarketSummaryRequest(request)); break; } case mtSymbolCountRequest: { // telnet chuck-liddell 8800 // command=symbol_count_request&long_form=show0%3DPrice%26show1%3DRV%26show2%3DTV%26show3%3DRSI5%26show4%3DUp&message_id=7 ExternalRequest *request = dynamic_cast< ExternalRequest * >(current); addToQueue(new MatchingSymbolCountRequest(request)); //_clients[request->getSocketInfo()] // .newRequest(new TopListRequest(request), this); break; } case mtWorkFinished: { TopListQueueItem *request = dynamic_cast< TopListQueueItem * >(current); const bool aborted = _workerCluster.acknowledge(request); if ((!aborted) && (!request->isDone())) { // We are not done. Work with it again soon. current = NULL; addToQueue(request); } break; } case DeleteSocketThread::callbackId: { SocketInfo *socket = current->getSocketInfo(); _clients.erase(socket); // I suspect this next part is redundant, but it doesn't hurt. // The client will delete all individual requests from the // various queues, but does _workerCluster have anything // else, maybe a list of sockets? _workerCluster.remove(socket); break; } case mtQuit: { delete current; return; } } delete current; } timeval nextTime; timeval *nextIfExists = NULL; while (!_timerQueue.empty()) { nextTime = _timerQueue.begin()->first.waitTime(); if (nextTime.tv_sec || nextTime.tv_usec) { // This is in the future. No more requests are ready to go. nextIfExists = &nextTime; break; } // This request is ready. Move it to the worker cluster. TopListQueueItem *request = _timerQueue.begin()->second; request->setDataMessageId (_clients[request->getSocketInfo()].messageId); _workerCluster.addWork(request); _timerQueue.erase(_timerQueue.begin()); } _workerCluster.matchAll(); _incoming.waitForRequest(nextIfExists); _incoming.resetWaitHandle(); } } ///////////////////////////////////////////////////////////////////// // Global ///////////////////////////////////////////////////////////////////// void initTopList() { TopListConfig::globalInit(); new TopListThread; }