#include #include "libjson.h" #include "../shared/ThreadClass.h" #include "../shared/SelectableRequestQueue.h" #include "../shared/ThreadMonitor.h" #include "../shared/DatabaseWithRetry.h" #include "Subscriptions.h" #include "GetStreamingData.h" // Read the streaming data from stocktwits. Read every tweet. class StreamingDataThread : private ThreadClass { private: enum { mtQuit }; SelectableRequestQueue _incoming; DatabaseWithRetry _database; // Stocktwits does not remember state. We have to tell it the last thing // that we saw, so it can restart after that. Otherwise we'd get duplicates. int64_t _startAfter; CURL *_curlHandle; void checkForData(); // We accumulate the entire result from stocktwits, and parses the entire // thing at once. std::string _result; // Every time we get data from curl, it calls writeFunction which // calls writeFunctionImpl. size_t writeFunctionImpl(void *ptr, size_t size, size_t nmemb); static size_t writeFunction(void *ptr, size_t size, size_t nmemb, void *userdata); time_t _lastDeadManTimer; void weAreAlive(); protected: void threadFunction(); public: StreamingDataThread(); ~StreamingDataThread(); }; //#include // StreamingDataThread updated to use the new StockTwits API requiring authentication. // Also note that the streams/all method changed the response format. void StreamingDataThread::checkForData() { ThreadMonitor &tm = ThreadMonitor::find(); tm.setState("checkForData()"); std::string url = "https://api.stocktwits.com/api/2/streams/all.json"; url += "?access_token=496ae4e0b026d197113b19942488069a2c7b7d45"; if (_startAfter) { url += "&since="; url += ntoa(_startAfter); } //std::cout<symbols = explode(",", stockSymbolString); stLiveMessage->send(); tm.increment("sent message"); weAreAlive(); } } } catch (std::invalid_argument ex) { tm.increment("EXCEPTION in parse"); return; } } size_t StreamingDataThread::writeFunctionImpl(void *ptr, size_t size, size_t nmemb) { size_t bytes = size * nmemb; _result.append((char const *)ptr, bytes); return bytes; } size_t StreamingDataThread::writeFunction(void *ptr, size_t size, size_t nmemb, void *userdata) { return ((StreamingDataThread *)userdata) ->writeFunctionImpl(ptr, size, nmemb); } void StreamingDataThread::threadFunction() { // This whole structure seems unnecessarily complicated. But I decided to // use our standard framework. Presumably this will help if I ever add // realtime debug commands. ThreadMonitor::find().setState("working"); while (true) { while (Request *current = _incoming.getRequest()) { switch (current->callbackId) { case mtQuit: { delete current; return; } } delete current; } checkForData(); timeval waitTime; waitTime.tv_sec = 1; waitTime.tv_usec = 0; _incoming.waitForRequest(&waitTime); _incoming.resetWaitHandle(); } } void StreamingDataThread::weAreAlive() { ThreadMonitor::find().increment("weAreAlive"); const time_t now = time(NULL); if (_lastDeadManTimer + 60 < now) { ThreadMonitor::find().increment("weAreAlive database"); _database.tryQueryUntilSuccess("UPDATE monitor_alive SET last_update = now() WHERE name = 'st_proxy'"); _lastDeadManTimer = now; } } StreamingDataThread::StreamingDataThread() : ThreadClass("StreamingDataThread"), _incoming(getName()), _database(false, getName()), _startAfter(0), _curlHandle(NULL), _lastDeadManTimer(0) { startThread(); } StreamingDataThread::~StreamingDataThread() { Request *r = new Request(NULL); r->callbackId = mtQuit; _incoming.newRequest(r); waitForThread(); } void initGetStreamingData() { new StreamingDataThread; }