#include "../shared/ThreadMonitor.h" #include "../shared/LogFile.h" #include "IntraDayDataProvider.h" #include "Dispatcher.h" /* This is a fairly simple dispatcher. Currently it's set up more as a * prototype than a product. It collects the data but it doesn't do much * with it yet. I.e. no alerts, no way for a normal user to see the data, * no way to stop a computation. Also the plan is for the dispatcher to * send most of the computations to other threads, so we can do more work * and take advantage of more cores. */ void Dispatcher::update(GridInstance *grid, time_t time) { const int expectedRowCount = grid->getPrototype()->getCandleTimer()->getTotalRowCount(time); while (expectedRowCount > grid->completedRowCountPossible()) { ThreadMonitor::find().increment("fillRow"); _gridFiller.fillRow(grid); } } void Dispatcher::threadFunction() { // Why is this a pointer and not a smart pointer? TODO typedef std::set< GridInstance * > Grids; Grids grids; time_t time = 0; ThreadMonitor &tm = ThreadMonitor::find(); while (true) { while (Request *current = _incoming.getRequest()) { tm.setState("Working"); switch (current->callbackId) { case mtStart: { tm.increment("mtStart"); tm.setState("mtStart"); StartRequest *request = dynamic_cast< StartRequest * >(current); GridDataProvider *dataProvider = new IntraDayDataProvider(request->symbol, _candleManager); GridInstance *grid = new GridInstance(request->prototype, dataProvider); grids.insert(grid); break; } case mtCandleTime: { tm.increment("mtCandleTime"); tm.setState("mtCandleTime"); DataNodeThread::TimeRequest *request = dynamic_cast< DataNodeThread::TimeRequest * >(current); time = request->time; for (Grids::iterator it = grids.begin(); it != grids.end(); it++) update(*it, time); break; } case mtDump: { tm.increment("mtDump"); tm.setState("mtDump"); for (Grids::iterator it = grids.begin(); it != grids.end(); it++) { TclList msg; msg<getDataProvider()->debugDump() <<(*it)->debugDump(); LogFile::primary().sendString(msg, current->getSocketInfo()); } break; } case mtQuit: { delete current; return; } case DeleteSocketThread::callbackId: { //SocketClosedDataNode::doNotify(current->getSocketInfo()); break; } } delete current; } _incoming.waitForRequest(); } } Dispatcher::Dispatcher(DataNodeThread &dataNodeThread) : ThreadClass("Dispatcher"), _incoming(getName()), _candleManager(dataNodeThread.getIntradayCandleManager()) { CommandDispatcher::getInstance() ->listenForCommand("dispatcher_dump", &_incoming, mtDump); startThread(); dataNodeThread.requestTimerUpdates(&_incoming, mtCandleTime); } Dispatcher::~Dispatcher() { Request *r = new Request(NULL); r->callbackId = mtQuit; _incoming.newRequest(r); waitForThread(); } void Dispatcher::start(StartRequest *startRequest) { startRequest->callbackId = mtStart; _incoming.newRequest(startRequest); } void Dispatcher::start(std::string symbol, GridPrototypeRef prototype) { StartRequest *r = new StartRequest(); r->symbol = symbol; r->prototype = prototype; start(r); }