#ifndef __DataNodeThread_h_ #define __DataNodeThread_h_ #include "../shared/ThreadClass.h" #include "../shared/PipeConditionVar.h" #include "../generate_alerts/misc_framework/DataNodes.h" #include "../generate_alerts/data_framework/SynchronizedTimers.h" /* This was heavily inspired by ../generate_alerts/tcl/DataNodeThread.h. */ class DataNodeThread : private ThreadClass, private DataNodeListener { public: // A DoInThreadRequest should really also be a Request. Unfortunately that's // hard to enforce at compile time. An older version of this code tried, // but it used virtual inheritance, which was a big mess. submit() checks // at run time to verify that this object is indeed a Request. class DoInThreadRequest { public: virtual ~DoInThreadRequest() { } // This will be executed in the data node thread. if the function returns // true, then this request has forwarded itself to another thread, and // the data node thread is done. If this returns false, then the data // node thread should delete this request. virtual bool inDataNodeThread() =0; }; private: enum { mtRequestTimer, mtDataNodeEvent, mtDoInThread, mtQuit }; struct TimerCallback { RequestListener *requestListener; int callbackId; }; class InitializeTimeRequest : public Request { public: TimerCallback timerCallback; InitializeTimeRequest() : Request(NULL) { callbackId = mtRequestTimer; } }; RequestQueue _incoming; DataNodeManager _dataNodeManager; //ReportAlertsThread _reportAlertsThread; SynchronizedTimer *_synchronizedTimer; PipeConditionVar _initialized; typedef std::vector< TimerCallback > TimerCallbacks; TimerCallbacks _timerCallbacks; void onWakeup(int msgId); protected: void threadFunction(); public: DataNodeThread(); ~DataNodeThread(); DataNodeManager &getDataNodeManager() { return _dataNodeManager; } // Request a callback any time that the timer changes. This is a wrapper // around SynchronizedTimer. The SynchronizedTimer runs in the data node // thread. Whenever we think it's time to update the one minute candles, // we send a TimeRequest to the listener. The time will be the beginning // of the one minute candle. The listener can create other candles from // this one. This is based on the time of the prints coming in, not the // time on our local clock, so if the datafeed is a little slow, we will // hold the candles. This can be called from any thread. class TimeRequest : public Request { public: const time_t time; TimeRequest(time_t t) : Request(NULL), time(t) { } }; void requestTimerUpdates(RequestListener *requestListener, int callbackId); void submit(DoInThreadRequest *request); // Execute code in the DataNodeThread. This automatically creates and // destroys the DoInThreadRequest object, so you don't have to. // socketInfo is the socket associated with the request. This can be NULL. // If the socket is closed, this request will automatically be destroyed. // action should be a lambda taking no inputs and returning void. template < typename Action > void submit(SocketInfo *socketInfo, Action action) { class SpecificRequest : public DoInThreadRequest, public Request { private: Action const _action; virtual bool inDataNodeThread() { _action(); return false; } public: SpecificRequest(SocketInfo *socket, Action action) : Request(socket), _action(action) { } }; submit(new SpecificRequest(socketInfo, action)); } void initAlerts(); }; #endif