#ifndef __LimitAlert_h_ #define __LimitAlert_h_ #include "../shared/Messages.h" #include "../generate_alerts/data_framework/TradeDirection.h" #include "../generate_alerts/misc_framework/DataNodes.h" #include "../generate_alerts/data_framework/GenericTosData.h" class LimitAlertRequest : public Request { private: static uint64_t _lastId; static uint64_t getNextId(); const uint64_t _id; const std::string _symbol; const double _price; const TradeDirection _direction; const bool _includeFormT; public: uint64_t getId() const { return _id; } std::string const &getSymbol() const { return _symbol; } double getPrice() const { return _price; } TradeDirection getDirection() const { return _direction; } bool getIncludeFormT() const { return _includeFormT; } LimitAlertRequest(SocketInfo *socketInfo, std::string const &symbol, double price, bool isLong, bool includeFormT); // After calling onFired() the LimitAlert unit is no longer responsible for // this object. onFired() should make sure that the object is eventually // deleted. It might do work immediately then call "delete this" at the // bottom of the function. onFired() might ship the off to another thread, // so that thread can delete the object at an unspecified time. virtual void onFired() =0; }; class LimitAlertManager : private DataNodeListener, private ThreadMonitor::Extra { private: DataNodeManager *const _manager; class SymbolWatcher : private DataNodeListener { private: LimitAlertManager *const _owner; const TradeDirection _direction; const bool _includeFormT; GenericTosDataNode *_tosData; DataNodeLink *_link; std::set< std::pair< double, LimitAlertRequest * > > _requests; double _currentPrice; std::pair< double, LimitAlertRequest * > getFirst() const; virtual void onWakeup(int msgId); void checkNow(); public: SymbolWatcher(LimitAlertManager *owner, std::string const &symbol, TradeDirection direction, bool includeFormT); ~SymbolWatcher(); void insert(LimitAlertRequest *newRequest); void erase(LimitAlertRequest *request); bool empty() const { return _requests.empty(); } std::string debugDumpCategory() const; std::string debugDump() const; }; std::map< std::string, SymbolWatcher * > _watchers[2][2]; SymbolWatcher &getWatcher(std::string const &symbol, TradeDirection direction, bool includeFormT); SymbolWatcher &getWatcher(LimitAlertRequest *request); void removeWatcherSoon(LimitAlertRequest *request); std::map< uint64_t, LimitAlertRequest * > _byId; std::set< std::pair< SocketInfo *, LimitAlertRequest * > > _bySocket; void insert(LimitAlertRequest *newRequest) { assert(!_byId.count(newRequest->getId())); _byId[newRequest->getId()] = newRequest; _bySocket.insert(std::make_pair(newRequest->getSocketInfo(), newRequest)); SymbolWatcher &watcher = getWatcher(newRequest); watcher.insert(newRequest); } // The request must already be in our tables. This method removes the // pointer to the request from our table. The caller should do something // with the request after this call. void erase(LimitAlertRequest *request) { _byId.erase(request->getId()); _bySocket.erase(std::make_pair(request->getSocketInfo(), request)); SymbolWatcher &watcher = getWatcher(request); watcher.erase(request); if (watcher.empty()) removeWatcherSoon(request); } // Look for the given request. If it was in our data structures, remove it, // and return it to the caller. It is up to the caller to do something with // the request now. If it was not in our data structures, return NULL and // do nothing else. LimitAlertRequest *erase(uint64_t id) { LimitAlertRequest *request = getPropertyDefault(_byId, id); if (request) erase(request); return request; } std::string debugDump() const; virtual std::string getInfoForThreadMonitor(); void submitImpl(LimitAlertRequest *newRequest); void cancelImpl(uint64_t id); class Submit : public DataNodeManager::EventQueueListener { private: LimitAlertManager *const _owner; LimitAlertRequest *_original; public: Submit(LimitAlertManager *owner, LimitAlertRequest *original) : DataNodeManager::EventQueueListener(original->getSocketInfo()), _owner(owner), _original(original) { } virtual void onEventQueue() { _owner->submitImpl(_original); _original = NULL; } ~Submit() { delete _original; } }; SocketClosedDataNode *_socketClosedDataNode; virtual void onWakeup(int msgId); ~LimitAlertManager() { assert(false); } public: // Thread safe. void submit(LimitAlertRequest *newRequest); // Thread safe. There are no locks. You might have a message coming your // way already. But do this anyway to avoid garbage building up. void cancel(uint64_t id); LimitAlertManager(DataNodeManager *manager); // We can't automatically create this because the constructor takes an input. // But it would be convenient if other people could grab this easily once // someone created it. static LimitAlertManager *primary; }; #endif