#include #include #include #include #include "libjson.h" #include "../shared/DatabaseWithRetry.h" #include "../shared/DatabaseSupport.h" #include "../shared/GlobalConfigFile.h" #include "../generate_alerts/misc_framework/AccumulateInsert.h" AccumulateInsert _insertAccumulator(); // Stocktwits does not remember state. We have to tell it the last thing // that we saw, so it can restart after that. Otherwise we'd get duplicates. int64_t _startAfter; CURL *_curlHandle; // We accumulate the entire result from stocktwits, and parses the entire // thing at once. std::string _result; // Every time we get data from curl, it calls writeFunction size_t writeFunction(void *ptr, size_t size, size_t nmemb, void *unused) { size_t bytes = size * nmemb; _result.append((char const *)ptr, bytes); return bytes; } // StreamingDataThread updated to use the new StockTwits API requiring authentication. // Also note that the streams/all method changed the response format. void checkForData(DatabaseWithRetry &db) { std::string url = "https://api.stocktwits.com/api/2/streams/all.json"; url += "?access_token=496ae4e0b026d197113b19942488069a2c7b7d45"; if (_startAfter) { url += "&since="; url += ntoa(_startAfter); } if (_curlHandle) curl_easy_reset(_curlHandle); else _curlHandle = curl_easy_init(); curl_easy_setopt(_curlHandle, CURLOPT_URL, url.c_str()); curl_easy_setopt(_curlHandle, CURLOPT_WRITEFUNCTION, writeFunction); curl_easy_setopt(_curlHandle, CURLOPT_WRITEDATA, NULL); curl_easy_setopt(_curlHandle, CURLOPT_NOSIGNAL, 1); curl_easy_setopt(_curlHandle, CURLOPT_TIMEOUT, 15); curl_easy_setopt(_curlHandle, CURLOPT_CONNECTTIMEOUT, 5); _result.clear(); // // tm.setState("curl_easy_perform()"); CURLcode status = curl_easy_perform(_curlHandle); if (status != CURLE_OK) { //http://curl.haxx.se/libcurl/c/libcurl-errors.html // tm.increment("CURL_ERROR " + ntoa(status)); curl_easy_cleanup(_curlHandle); _curlHandle = NULL; return; } // // tm.setState("verify"); // It seems like we can make our error handling a lot simpler by validating // the entire file here. By default the library does lazy parsing. if (!libjson::is_valid(_result)) { // tm.increment("JSON not valid"); return; } // // tm.setState("parse string"); try { JSONNode result = libjson::parse(_result); // // tm.setState("parse ojbect"); //std::cout<<_result< & symbols = explode(",", stockSymbolString); // round to minute for db insert t2 = t2 / 60 * 60; for (std::vector::const_iterator it = symbols.begin(); it != symbols.end(); ++it) { if (first) first = false; else ss << ','; ss << "(\"" << mysqlEscapeString(*it) << '"' << ", FROM_UNIXTIME(" << t2 << "), " << userId << ", 1, \"neutral\")"; } } if (!first) { ss << " ON DUPLICATE KEY UPDATE count = count + 1"; //std::cout << ss.str(); db.tryQueryUntilSuccess(ss.str()); } } db.tryQueryUntilSuccess("UPDATE monitor_alive SET last_update = now() WHERE name = 'GetAllFromST'"); } catch (std::invalid_argument ex) { std::cout << "EXCEPTION in parse" << std::endl; return; } } int main(int argc, char *argv[]) { addConfigItemsFromCommandLine(argv + 1); configItemsComplete(); DatabaseWithRetry st("@st", "st"); while (true) { checkForData(st); sleep(1); } }