#include #include "../shared/ReplyToClient.h" #include "../shared/ThreadClass.h" #include "../shared/GlobalConfigFile.h" #include "MiscThread.h" ///////////////////////////////////////////////////////////////////// // MiscThread // // If you are looking at adding more functionality to this thread, // consider using ../shared/NewWorkerCluster.h instead. // After several attempts that seems to be the best generic worker // thread cluster. ///////////////////////////////////////////////////////////////////// static RequestQueue *incoming; class MiscThread : private ThreadClass { private: enum { mtDeferedXmlReply, mtQuit }; protected: void threadFunction(); public: MiscThread(); ~MiscThread(); static void add(DeferedXmlReply *request); }; void MiscThread::add(DeferedXmlReply *request) { request->callbackId = mtDeferedXmlReply; incoming->newRequest(request); } void MiscThread::threadFunction() { while (true) { while (Request *current = incoming->getRequest()) { switch (current->callbackId) { case mtDeferedXmlReply: { DeferedXmlReply *request = dynamic_cast(current); request->doWork(); break; } case mtQuit: delete current; return; // mtQuit doesn't really work. See note below. // We couldn't use the delete item, either. // Luckily we don't have any state, so we don't // need to look for a delete request. } delete current; } incoming->waitForRequest(); } } MiscThread::MiscThread() : ThreadClass("MiscThread " + pointerToString(this)) { startThread(); } MiscThread::~MiscThread() { assert(false); // This does not work!!!!! // To fix this you would have to find a different way to notify // the thread. Request *request = new Request(NULL); request->callbackId = mtQuit; incoming->newRequest(request); waitForThread(); } void initMiscThread() { assert(!incoming); incoming = new RequestQueue("MiscThread"); new MiscThread; // We used to read the thread count from getConfigItem("misc_thread_count"). // For a long time we've used the default value of 1. At one time we // experimented with other values, but they didn't help much. So there's // no real change in functionality here. I'm just taking away the // possibility that someone would change this. The thread count is now hard // coded to 1. // // I recently realized that this class design has a serious flaw. Normally // each thread has its own queue. So each thread will get its own message // when a socket closes. And we won't delete the socket until every thread // acknowledges that message. // // If you have two or more of these threads all sharing a queue, the // following can happen. // 1) One of these threads starts processing a request for a particular // socket. // 2) Someone tries to close that socket. // 3) The system sends the close message to our shared queue. // 4) The first thread is busy with that request, so a different thread will // grab the close message. // 5) That thread will immediately delete the close message, acknowledging // that we're done with that socket. // 6) The system will see as many acknowledgments as it expected, so it will // delete the socket object. THIS IS THE PROBLEM. The first thread // is still working on a request for that socket. // 7) The first thread will finally finish working on the XML for its // request. // 8) The first thread will pass it's result on to the output thread so // the result can be sent back to the user. // 9) The queues might not notice that this is a deleted socket because // we didn't follow the rules. // 10) If we're lucky the assertions in the queue will catch the attempt to // use a deleted object and produce a helpful core dump. More likely // we'll crash in a mysterious way. Possibly we'd send this message to // a different client. } ///////////////////////////////////////////////////////////////////// // DeferedXmlReply ///////////////////////////////////////////////////////////////////// static std::string API = "API"; void DeferedXmlReply::doWork() { addToOutputQueue(getSocketInfo(), _node.asString(API), _messageId); } void DeferedXmlReply::send() { MiscThread::add(this); } void DeferedXmlReply::send(SocketInfo *socket, XmlNode &node, ExternalRequest::MessageId messageId) { DeferedXmlReply *request = new DeferedXmlReply(socket); request->_node.swap(node); request->_messageId = messageId; request->send(); }