#include #include "SelectableRequestQueue.h" #include "ThreadMonitor.h" #include "ThreadClass.h" #include "PollSet.h" #include "CurrentRequest.h" #include "SimpleLogFile.h" #include "ContainerThread.h" ///////////////////////////////////////////////////////////////////// // IContainerThreadUser ///////////////////////////////////////////////////////////////////// static __thread IContainerThread::Work *callbackInProgress = NULL; class ReturnToMe : public IContainerThread::Work { private: IContainerThreadUser::Ref _user; public: ReturnToMe(IContainerThreadUser::Ref user, Request *original) : Work(original), _user(user) { } virtual void inThread() { CurrentRequest::Recursive cr(getOriginal()); callbackInProgress = this; _user->handleRequestInThread(getOriginal()); // The ReturnToMe object will be deleted in a moment by the main event // loop. Normally that will cause the original message to be deleted. // However, someone can call callbackInProgress->clearOriginal() and then // the original will not be deleted. See // IContainerThreadUser::keepOriginal(). callbackInProgress = NULL; } }; void IContainerThreadUser::returnToMe(Ref me, Request *original) const { assert(this == &*me); getContainer()->addToQueue(new ReturnToMe(me, original)); } void IContainerThreadUser::keepOriginal() const { callbackInProgress->clearOriginal(); } bool IContainerThreadUser::inMyThread() const { return getContainer() == IContainerThread::current(); } std::string IContainerThreadUser::debugName() { std::string result = unmangledName(typeid (*this)); result += '('; result += pointerToString(this); result += ')'; return result; } ///////////////////////////////////////////////////////////////////// // IBeforeSleepCallbacks ///////////////////////////////////////////////////////////////////// void IBeforeSleepCallbacks::wakeAfterMicroSeconds(int64_t us) { // Often say +999, i.e. ciel() rather than round(). wakeAfterMs((us+500)/1000); } void IBeforeSleepCallbacks::wakeAtTime(time_t wakeAt) { const time_t now = time(NULL); if (wakeAt < now) wakeAfterMs(0); else { const time_t seconds = wakeAt - now; if (seconds >= INT_MAX / 1000) wakeAfterMs(INT_MAX); else wakeAfterMs(seconds * 1000); } } ///////////////////////////////////////////////////////////////////// // PollSetWrapper ///////////////////////////////////////////////////////////////////// class PollSetWrapper : public IBeforeSleepCallbacks { private: PollSet _pollSet; int _timeout; public: virtual void waitForRead(int handle) { _pollSet.addForRead(handle); } virtual void waitForWrite(int handle) { _pollSet.addForWrite(handle); } virtual void wakeAfterMs(int timeout) { _timeout = std::min(_timeout, timeout); } PollSetWrapper() : _timeout(std::numeric_limits< int >::max()) { } PollSet &getPollSet() { if (_timeout < std::numeric_limits< int >::max()) _pollSet.setTimeoutMs(_timeout); return _pollSet; } }; ///////////////////////////////////////////////////////////////////// // IContainerThread ///////////////////////////////////////////////////////////////////// static __thread IContainerThread *currentContainerThread = NULL; IContainerThread *IContainerThread::current() { return currentContainerThread; } ///////////////////////////////////////////////////////////////////// // ContainerThread ///////////////////////////////////////////////////////////////////// class ContainerThread : public ThreadClass, public IContainerThread { private: enum { mtAddUser, mtRemoveUser, mtDoWorkAndDelete, mtDoWorkAndKeep }; std::string createName(std::string const &name, bool makeUnique) { if (!makeUnique) return name; return name + ' ' + pointerToString(this); } ~ContainerThread() { assert(false); } SelectableRequestQueue _incoming; std::set< IContainerThreadUser::Ref > _users; class UserRequest : public Request { public: const IContainerThreadUser::Ref user; UserRequest(IContainerThreadUser::Ref user) : Request(NULL), user(user) { } }; std::set< IContainerThreadUser::Ref > _needsBeforeSleep; protected: void threadFunction() { currentContainerThread = this; ThreadMonitor &tm = ThreadMonitor::find(); while (true) { tm.setState("Prepair pollSet"); PollSetWrapper pollSet; pollSet.waitForRead(_incoming.getWaitHandle()); _needsBeforeSleep = _users; while (!_needsBeforeSleep.empty()) { const IContainerThreadUser::Ref user = *_needsBeforeSleep.begin(); _needsBeforeSleep.erase(_needsBeforeSleep.begin()); if (_users.count(user)) // Hasn't been removed. user->beforeSleep(pollSet); } pollSet.getPollSet().poll(); tm.setState("Read from queue"); _incoming.resetWaitHandle(); while (Request *current = _incoming.getRequest()) { CurrentRequest::Simple cr(current); switch (current->callbackId) { case mtAddUser: { UserRequest *request = dynamic_cast< UserRequest * >(current); _users.insert(request->user); TclList msg; msg<user->debugName(); sendToLogFile(msg); // Here are some samples of this message. // {Thu Aug 6 09:52:05 2020} 0 ContainerThread.C 192 threadFunction mtAddUser thread {batch ContainerThread} user AutoSymbolListManagerImpl(0xebcd48) // {Thu Aug 6 09:52:05 2020} 0 ContainerThread.C 192 threadFunction mtAddUser thread {primary ContainerThread} user MultiCast(0x7f2850002e10) // {Thu Aug 6 09:52:05 2020} 0 ContainerThread.C 192 threadFunction mtAddUser thread {incoming top_list multicast} user AlertRecordMultiCastReceiver(0x118a210) // {Thu Aug 6 09:52:05 2020} 0 ContainerThread.C 192 threadFunction mtAddUser thread TopListMicroService user {NewWorkerCluster(TopListMicroService fast)} // {Thu Aug 6 09:52:05 2020} 0 ContainerThread.C 192 threadFunction mtAddUser thread TopListMicroService user {NewWorkerCluster(TopListMicroService slow)} request->user->initializeInThread(); break; } case mtRemoveUser: { UserRequest *request = dynamic_cast< UserRequest * >(current); _users.erase(request->user); break; } case mtDoWorkAndDelete: { Work *request = dynamic_cast< Work * >(current); request->inThread(); break; } case mtDoWorkAndKeep: { Work *request = dynamic_cast< Work * >(current); request->inThread(); current = NULL; break; } case DeleteSocketThread::callbackId: for (std::set< IContainerThreadUser::Ref >::const_iterator it = _users.begin(); it != _users.end(); it++) { IContainerThreadUser::Ref user = *it; user->socketClosed(current->getSocketInfo()); } break; } delete current; } tm.setState("Notify users"); for (std::set< IContainerThreadUser::Ref >::const_iterator it = _users.begin(); it != _users.end(); it++) { IContainerThreadUser::Ref user = *it; user->awake(pollSet.getPollSet().woken()); } } } public: ContainerThread(std::string const &name, bool makeUnique) : ThreadClass(createName(name, makeUnique)), _incoming(getName()) { startThread(); } ContainerThread(std::string const &name, bool makeUnique, int queueBacklogPeriod) : ThreadClass(createName(name, makeUnique)), _incoming(getName(), queueBacklogPeriod) { startThread(); } virtual void addUser(IContainerThreadUser::Ref user) { UserRequest *request = new UserRequest(user); request->callbackId = mtAddUser; _incoming.newRequest(request); } virtual void removeUser(IContainerThreadUser::Ref user) { UserRequest *request = new UserRequest(user); request->callbackId = mtRemoveUser; _incoming.newRequest(request); } virtual void addToQueue(Work *work, bool autoDelete) { work->callbackId = autoDelete?mtDoWorkAndDelete:mtDoWorkAndKeep; _incoming.newRequest(work); } virtual std::string getThreadName() const { return getName(); } void requestBeforeSleep(IContainerThreadUser::Ref const &user) { _needsBeforeSleep.insert(user); } }; ///////////////////////////////////////////////////////////////////// // IContainerThread ///////////////////////////////////////////////////////////////////// IContainerThread *IContainerThread::primary() { // Note: A static initializer in a function is guaranteed to be thread // safe. That is a feature of g++. There is much discussion (and not // much clarity) on what the C++ standard says. Microsoft's C++ compiler // is explicitly not thread safe. static IContainerThread *instance = new ContainerThread("primary ContainerThread", false); return instance; } IContainerThread *IContainerThread::batch() { static IContainerThread *instance = new ContainerThread("batch ContainerThread", false); return instance; } IContainerThread *IContainerThread::create(std::string name, bool makeNameUnique, int queueLogPeriod) { return new ContainerThread(name, makeNameUnique, queueLogPeriod); } ///////////////////////////////////////////////////////////////////// // ForeverThreadUser ///////////////////////////////////////////////////////////////////// void ForeverThreadUser::requestBeforeSleep() { dynamic_cast< ContainerThread * >(getContainer()) ->requestBeforeSleep(_keepMeAlive); }