#include "../shared/Messages.h" #include "../shared/ThreadClass.h" #include "../shared/CommandDispatcher.h" #include "../shared/LogFile.h" #include "../shared/ReplyToClient.h" #include "../shared/MarketHours.h" #include "../shared/DatabaseWithRetry.h" #include "../shared/GlobalConfigFile.h" #include "../shared/TwoDLookup.h" #include "GetStreamingData.h" #include "Subscriptions.h" ///////////////////////////////////////////////////////////////////// // StForSymbol ///////////////////////////////////////////////////////////////////// class StForSymbol { private: enum Reason { NewMessage, DailyReset, DatabaseInit, ClientRequest }; struct Fields { int64_t messageCount; int8_t reason; } __attribute__ ((packed)); std::string _packed; bool initialized() const { return !_packed.empty(); } void init(std::string const &symbol); void update(int64_t messageCount, Reason reason); public: void resetAfterSend(); // Hopefully obvious! This does not know about clients, so it cannot notify // them. That happens elsewhere. void clear(); void increment(std::string const &symbol); void setValue(int64_t value, std::string const &symbol); // This is the normal way a client will get the data. Keep it binary mostly // to save CPU cycles. std::string const &binaryMessage() const; // Aimed at debugging. std::string textMessage(bool verbose) const; // This is useful if someone requests data about a symbol we are not // tracking. The contract says that if the client makes a request, the // server will respond. static std::string emptyBinaryMessage(std::string const &symbol); }; std::string StForSymbol::emptyBinaryMessage(std::string const &symbol) { std::string result = std::string(sizeof(Fields), '\0') + symbol; Fields *fields = (Fields *)&result[0]; fields->messageCount = 0; fields->reason = ClientRequest; return result; } void StForSymbol::init(std::string const &symbol) { // Ideally we'd be initialized in the constructor. But because this is part // of a map, we only use the default constructor. if (!initialized()) _packed = std::string(sizeof(Fields), '\0') + symbol; } void StForSymbol::resetAfterSend() { // Something changed, so we reported it to all listeners. Now we mark the // reason as ClientRequest. If someone receives this data, it was not a // push from us, but a pull from the client. if (!initialized()) return; Fields *fields = (Fields *)&_packed[0]; fields->reason = ClientRequest; } void StForSymbol::clear() { // We store the count for the entire day. The client might store counts // for shorter time frames. At midnight, we reset everything to 0 and // report that to the client. if (!initialized()) return; Fields *fields = (Fields *)&_packed[0]; fields->messageCount = 0; fields->reason = DailyReset; } void StForSymbol::increment(std::string const &symbol) { // This is the common case. A new message came. We update our count and // report it. init(symbol); Fields *fields = (Fields *)&_packed[0]; fields->messageCount++; fields->reason = NewMessage; } void StForSymbol::setValue(int64_t value, std::string const &symbol) { // This is how we initialize when we first start. We read the values from // the database. This might overwrite some old data. init(symbol); Fields *fields = (Fields *)&_packed[0]; fields->messageCount = value; fields->reason = DatabaseInit; } std::string const &StForSymbol::binaryMessage() const { return _packed; } std::string StForSymbol::textMessage(bool verbose) const { Fields *fields = (Fields *)&_packed[0]; TclList result; result<<"Message Count"<messageCount <<"Reason"<reason; if (verbose) result<<"Internal"<<_packed; return result; } ///////////////////////////////////////////////////////////////////// // SubscriptionsThread ///////////////////////////////////////////////////////////////////// class SubscriptionsThread : private ThreadClass { private: enum { mtSetValue, // Set the value. Use this to initialize when we start. mtIncrementValue, // Add one message. // Debug mtDumpData, mtDumpListeners, mtSnapshot, mtNightlyReset, mtLoadFromDatabase, mtReloadTranslations, // Subscriptions. mtListen, mtAdd, mtRemove, // Standard. mtQuit }; typedef std::map< SocketInfo *, ExternalRequest::MessageId > ListenerChannel; ListenerChannel _listenerChannel; typedef std::map< std::string, StForSymbol > Values; Values _values; typedef std::pair< std::string, SocketInfo * > Listener; typedef std::set< Listener > Listeners; Listeners _listeners; RequestQueue _incomingRequests; void sendToListeners(StForSymbol &value, std::string const &symbol); void nightlyResetNow(); time_t _nextResetTime; void checkForNightlyReset(); void loadFromDatabase(); TwoDArray _translation; std::string translate(std::string original); void loadTranslations(SocketInfo *socket = NULL); protected: void threadFunction(); public: SubscriptionsThread(); ~SubscriptionsThread(); void addData(StDatabaseMessage *message); void addData(StLiveMessage *message); static SubscriptionsThread *instance; }; SubscriptionsThread *SubscriptionsThread::instance; std::string SubscriptionsThread::translate(std::string original) { std::string result = _translation.get("ours", original); if (result.empty()) return original; else return result; } void SubscriptionsThread::nightlyResetNow() { LogFile::primary().sendString(TclList()<second.clear(); sendToListeners(current->second, current->first); // Remove the value from the cache. We just set it to 0, and 0 is the // default anyway. _values.erase(current); } LogFile::primary().sendString(TclList()< _nextResetTime) { if (_nextResetTime > 0) { // The first time though we just set the clock. We just initialized // from the database and we don't want to delete that. nightlyResetNow(); // Do this now because it's as good a time as any. Ideally this // would be coordinated with the list being rebuilt. But at // least we do this on a regular basis. loadTranslations(); } _nextResetTime = midnight(now) + 24 * MARKET_HOURS_HOUR; } } void SubscriptionsThread::loadFromDatabase() { LogFile::primary().sendString(TclList()<= CURDATE() " "GROUP BY symbol"); result->rowIsValid(); result->nextRow()) { const std::string symbol = translate(result->getStringField(0)); const int64_t count = result->getIntegerField(1, 0); StForSymbol &value = _values[symbol]; value.setValue(count, symbol); sendToListeners(value, symbol); } LogFile::primary().sendString(TclList()<callbackId = mtSetValue; _incomingRequests.newRequest(message); } void SubscriptionsThread::addData(StLiveMessage *message) { message->callbackId = mtIncrementValue; _incomingRequests.newRequest(message); } void SubscriptionsThread::sendToListeners(StForSymbol &value, std::string const &symbol) { for (Listeners::const_iterator it = _listeners.lower_bound(Listener(symbol, NULL)); (it != _listeners.end()) && (it->first == symbol); it++) { SocketInfo *const socket = it->second; addToOutputQueue(socket, value.binaryMessage(), _listenerChannel[socket]); } value.resetAfterSend(); } void SubscriptionsThread::loadTranslations(SocketInfo *socket) { const int before = _translation.getRowHeaders().size(); _translation.loadFromCSV(getConfigItem("translation_table", "stocktwits_translation.csv")); const int after = _translation.getRowHeaders().size(); TclList msg; msg<callbackId) { case mtSetValue: { // I created this when I thought the database would be in // a different thread. This is not currently used. See // loadFromDatabase() for the real way to do this. static const std::string S_SET_VALUE = "SET VALUE"; tm.setState(S_SET_VALUE); StDatabaseMessage *request = dynamic_cast< StDatabaseMessage * >(current); std::string const &symbol = request->symbol; StForSymbol &value = _values[symbol]; value.setValue(request->messageCount, symbol); sendToListeners(value, symbol); break; } case mtIncrementValue: { static const std::string S_INCREMENT_VALUE = "INCREMENT VALUE"; tm.setState(S_INCREMENT_VALUE); StLiveMessage *request = dynamic_cast< StLiveMessage * >(current); for (std::vector< std::string >::const_iterator it = request->symbols.begin(); it != request->symbols.end(); it++) { std::string const &symbol = translate(*it); StForSymbol &value = _values[symbol]; value.increment(symbol); sendToListeners(value, symbol); } break; } case mtNightlyReset: { ExternalRequest *request = dynamic_cast(current); nightlyResetNow(); addToOutputQueue(request->getSocketInfo(), "Done", request->getResponseMessageId()); break; } case mtLoadFromDatabase: { ExternalRequest *request = dynamic_cast(current); loadFromDatabase(); addToOutputQueue(request->getSocketInfo(), "Done", request->getResponseMessageId()); break; } case mtReloadTranslations: { ExternalRequest *request = dynamic_cast(current); loadTranslations(request->getSocketInfo()); addToOutputQueue(request->getSocketInfo(), "Done", request->getResponseMessageId()); break; } case mtDumpListeners: { static const std::string S_DUMP_LISTENERS = "DUMP LISTENERS"; tm.setState(S_DUMP_LISTENERS); ExternalRequest *request = dynamic_cast(current); for (ListenerChannel::const_iterator it = _listenerChannel.begin(); it != _listenerChannel.end(); it++) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"listenerChannel"<second.getValue(); LogFile::primary().sendString(msg, it->first); } for (Listeners::const_iterator it = _listeners.begin(); it != _listeners.end(); it++) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <<"listening"<first; LogFile::primary().sendString(msg, it->second); } addToOutputQueue(request->getSocketInfo(), "Done", request->getResponseMessageId()); break; } case mtDumpData: { static const std::string S_DUMP_DATA = "DUMP DATA"; tm.setState(S_DUMP_DATA); ExternalRequest *request = dynamic_cast(current); const bool verbose = request->getProperty("verbose") == "1"; for (Values::const_iterator it = _values.begin(); it != _values.end(); it++) { TclList msg; msg<<__FILE__<<__LINE__<<__FUNCTION__ <first <second.textMessage(verbose); LogFile::primary().sendString(msg); } addToOutputQueue(request->getSocketInfo(), "Done", request->getResponseMessageId()); break; } case mtSnapshot: { static const std::string S_SNAPSHOT = "SNAPSHOT"; tm.setState(S_SNAPSHOT); ExternalRequest *request = dynamic_cast(current); const std::string symbol = request->getProperty("symbol"); const bool verbose = request->getProperty("verbose") == "1"; std::string message = symbol + ": "; if (_values.count(symbol)) message += _values[symbol].textMessage(verbose); else message += "Not found"; addToOutputQueue(request->getSocketInfo(), message, request->getResponseMessageId()); break; } case mtListen: { static const std::string S_LISTEN = "LISTEN"; tm.setState(S_LISTEN); ExternalRequest *request = dynamic_cast(current); _listenerChannel[request->getSocketInfo()] = request->getResponseMessageId(); break; } case mtAdd: { static const std::string S_ADD = "ADD"; tm.setState(S_ADD); ExternalRequest *request = dynamic_cast(current); const std::string symbol = request->getProperty("symbol"); _listeners.insert(Listener(symbol, request->getSocketInfo())); SocketInfo *const socket = request->getSocketInfo(); std::string reply; if (StForSymbol const *value = getProperty(_values, symbol)) reply = value->binaryMessage(); else reply = StForSymbol::emptyBinaryMessage(symbol); addToOutputQueue(socket, reply, _listenerChannel[socket]); break; } case mtRemove: { static const std::string S_REMOVE = "REMOVE"; tm.setState(S_REMOVE); ExternalRequest *request = dynamic_cast(current); const std::string symbol = request->getProperty("symbol"); _listeners.erase(Listener(symbol, request->getSocketInfo())); break; } case mtQuit: { delete current; return; } case DeleteSocketThread::callbackId: { static const std::string S_DELETE_SOCKET = "DELETE SOCKET"; tm.setState(S_DELETE_SOCKET); SocketInfo *socket = current->getSocketInfo(); _listenerChannel.erase(socket); for (Listeners::iterator it = _listeners.begin(); it != _listeners.end(); ) { Listeners::iterator next = it; next++; if (it->second == socket) _listeners.erase(it); it = next; } break; } } delete current; } checkForNightlyReset(); _incomingRequests.waitForRequest(); } } SubscriptionsThread::SubscriptionsThread() : ThreadClass("SubscriptionsThread"), _incomingRequests(getName()), _nextResetTime(0) { CommandDispatcher *dispatcher = CommandDispatcher::getInstance(); // These are only for debugging and development. They take no arguments. // They report the internal state to the log file (because it might be // too long to see on the screen). They print a confirmation on the screen // so you know you typed it right. dump_data has one option: verbose=1 // means to include the computer friendly format, not just the user friendly // format. dispatcher->listenForCommand("dump_data", &_incomingRequests, mtDumpData); dispatcher->listenForCommand("dump_listeners", &_incomingRequests, mtDumpListeners); // This is only for debugging. It displays a user friendly dump of the // data for the given symbol. The argument is passed as symbol=DELL. // verbose=1 means to include the computer friendly format, not just the user //friendly format. This will display something, even if the symbol is not // valid. dispatcher->listenForCommand("snapshot", &_incomingRequests, mtSnapshot); // These are typically run automatically. These messages allow us to run // them at other times, for debugging. dispatcher->listenForCommand("nightly_reset", &_incomingRequests, mtNightlyReset); dispatcher->listenForCommand("load_from_database", &_incomingRequests, mtLoadFromDatabase); dispatcher->listenForCommand("reload_translations", &_incomingRequests, mtReloadTranslations); // This is how you set the channel for all subscriptions. It works just like // the oddsmaker and top list channels. You open it once and listen to it // forever. You make specific requests with other commands. Aside from // message_id there are no arguments. dispatcher->listenForCommand("listen", &_incomingRequests, mtListen); // This subscribes to a symbol. The argument is passed as symbol=DELL. This // will not display any result. All results will come back from the listen // command. This will request an immediate snapshot, in attion to any and // all updates. A duplicate call to add will request another snapshot, but // is otherwise ignored. dispatcher->listenForCommand("add", &_incomingRequests, mtAdd); // This unsubscribes. A duplicate request to unsubscribe is ignored. There // are no responses from this command. Send the symbol as symbol=DELL dispatcher->listenForCommand("remove", &_incomingRequests, mtRemove); // Also, you can use the standard ping command, from ../shared/Ping.[Ch] startThread(); } SubscriptionsThread::~SubscriptionsThread() { Request *r = new Request(NULL); r->callbackId = mtQuit; _incomingRequests.newRequest(r); waitForThread(); } ///////////////////////////////////////////////////////////////////// // StDatabaseMessage ///////////////////////////////////////////////////////////////////// StDatabaseMessage::StDatabaseMessage() : Request(NULL), messageCount(0) { } void StDatabaseMessage::send() { SubscriptionsThread::instance->addData(this); } ///////////////////////////////////////////////////////////////////// // StLiveMessage ///////////////////////////////////////////////////////////////////// StLiveMessage::StLiveMessage() : Request(NULL) { } void StLiveMessage::send() { SubscriptionsThread::instance->addData(this); } ///////////////////////////////////////////////////////////////////// // Initialization ///////////////////////////////////////////////////////////////////// void initSubscriptions() { assert(!SubscriptionsThread::instance); SubscriptionsThread::instance = new SubscriptionsThread(); }