#include #include #include "../shared/Messages.h" #include "../shared/MiscSupport.h" #include "../shared/CommandDispatcher.h" #include "../shared/LogFile.h" #include "../shared/GlobalConfigFile.h" #include "../shared/XmlSupport.h" #include "../shared/ReplyToClient.h" #include "../shared/ThreadMonitor.h" #include "MimeBase64.h" #include "OneTimeTimeout.h" #include "ProxyMainLoop.h" ///////////////////////////////////////////////////////////////////// // Global ///////////////////////////////////////////////////////////////////// static bool verboseDebugging() { static bool value = getConfigItem("verbose_debugging") == "1"; return value; } static void sendGenericSuccessResponse(HttpRequest *httpRequest) { ScgiToServer response; XmlNode responseBody; responseBody.properties["STATUS"] = "success"; response._body = responseBody.asString(); httpRequest->sendResponse(response); } ///////////////////////////////////////////////////////////////////// // Connection ///////////////////////////////////////////////////////////////////// class Connection { private: static std::string getBaseDir(HttpRequest *request); static std::map< SocketInfo *, Connection * > _byServerSocket; static std::map< SocketInfo *, Connection * > _byReadSocket; static std::map< std::string, Connection * > _byCookie; DeadManTimer &_deadManTimer; SocketInfo *_server; HttpRequest *_pendingReadRequest; std::string _lastReadBody; int _lastReadId; struct OutgoingMessage { int64_t id; std::string data; }; std::deque< OutgoingMessage > _availableToRead; std::string _notYetParsed; void checkForRead(); void checkForDelayedDisconnect(); std::string::size_type _maxReadMessageSize; int _lastWriteId; std::string _cookie; bool _valid; bool _serverSideClosed; void resetTimer(); public: static Connection *findByServer(SocketInfo *socket); static Connection *findByRead(SocketInfo *socket); static Connection *find(std::string cookie); static Connection *find(HttpRequest *request); static void deleteAll(); Connection(HttpRequest *request, InputListener &serverListener, DeadManTimer &deadManTimer); ~Connection(); void clientToServer(HttpRequest *httpRequest); void clientRead(HttpRequest *httpRequest); void internalToServer(std::string data); void serverToClient(std::string data); void serverClosedConnection(); bool getValid() const; void removeReadRequest(); void readTimeout(); }; std::map< SocketInfo *, Connection * > Connection::_byServerSocket; std::map< SocketInfo *, Connection * > Connection::_byReadSocket; std::map< std::string, Connection * > Connection::_byCookie; void Connection::resetTimer() { // This timer says how long it's been since we've heard from the client. // The client should send us a disconnect request when he is done. But // there is nothing to enforce that. (In theory the OS will automatically // notify us when a normal TCP/IP socket is closed, but in practice it // helps to use a deadman timer even there.) if (!_serverSideClosed) // It's possible that the server end has already been shut down, but the // client is still allowed to read the last bit of data. In that case we // keep the data until the client reads it or the dead man timer expires. // In that case we set a finite time for the timer; we stop telling the // dead man timer to extend the timeout. // // This test is not strictly required. We could allow the client to keep // trying. But I like this as a safety mechanism. Even if other things // go wrong, we know the Connection object will only live a short time // after the server closes the corresponding socket. // // There is no change in functionality here. If there are network problems // the HTTP tunnel can retry several times before giving up and breaking // the connection. At that point the main program will create another // connection, as another way of retrying. We're just slightly altering // the point where one component or the other is responsible for an // automatic retry. _deadManTimer.touchConnection(_server); } void Connection::checkForRead() { // If data comes from the server first, it gets added to _availableToRead // until a client request comes to take it. If the client request comes // first it sits in _pendingReadRequest until there is data to fulfill it. // Either way, this checks if both are now ready. If so, it copies data // from the queue to the client. if (!_pendingReadRequest) { if (verboseDebugging()) { TclList logMsg; logMsg<fromServer.getPostValues(postValues); const int currentId = strtolDefault(postValues["sequence"], -1); // As with the client write requests, we expect the client to start from // 1 and increment by 1 each time. But it's not our job to look for // programming errors. So we really only consider the three cases listed // below: 1) older than the last request, must be a mistake, 2) same as the // last request, send the same answer, or 3) newer request, throw out any // old data, send the new data, and save this in case we are asked to repeat // ourselves. if (currentId < _lastReadId) { // This is a very old request (or a completely invalid request). It's // safe to assume this has already been abandoned by the client, so it // doesn't matter what we send. Or the client is very confused, in which // case we ask it to disconnect. removeReadRequest(); if (verboseDebugging()) { TclList logMsg; logMsg<sendResponse(response); removeReadRequest(); if (verboseDebugging()) { TclList logMsg; logMsg<= _availableToRead.front().data.size())) { XmlNode &destination = combinedMessage[-1]; OutgoingMessage const &source = _availableToRead.front(); if (source.id) // This might save a few bytes. It's probably not a big deal because // message id = 0 often means "don't send me a reply at all." But // this is consistent with our normal philosophy of not repeating // defaults or anything obvious. destination.properties["ID"] = ntoa(source.id); if (XmlNode::binarySafe(source.data)) { // Nothing will get lost destination.useCdata = true; destination.text = source.data; } else { destination.properties["FORMAT"] = "base64"; destination.text = base64Encode(source.data); } spaceAvailable -= source.data.size(); _availableToRead.pop_front(); } _lastReadBody = combinedMessage.asString(); _lastReadId = currentId; ScgiToServer response; response._body = _lastReadBody; _pendingReadRequest->sendResponse(response); removeReadRequest(); if (verboseDebugging()) { TclList logMsg; logMsg<getSocketInfo()); delete _pendingReadRequest; _pendingReadRequest = NULL; } } void Connection::readTimeout() { assert(_pendingReadRequest); // This is just a stripped down version of Connection::checkForRead() where // we know we have a new read request, and no new data to give it, so // we just send an empty body. PropertyList postValues; _pendingReadRequest->fromServer.getPostValues(postValues); const int currentId = strtolDefault(postValues["sequence"], -1); XmlNode combinedMessage; combinedMessage.properties["STATUS"] = "success"; _lastReadBody = combinedMessage.asString(); _lastReadId = currentId; ScgiToServer response; response._body = _lastReadBody; _pendingReadRequest->sendResponse(response); removeReadRequest(); if (verboseDebugging()) { TclList logMsg; logMsg<getSocketInfo()] = this; checkForRead(); checkForDelayedDisconnect(); } bool Connection::getValid() const { return _valid; } void Connection::clientToServer(HttpRequest *httpRequest) { // We could call resetTimer() here, but it seems redundant at best. The // important thing is that the client is listening. PropertyList postValues; httpRequest->fromServer.getPostValues(postValues); const int currentId = strtolDefault(postValues["sequence"], -1); // We expect the client to start from 1 and increment by 1 every time. We // don't check that carefully. This logic is only aimed at finding duplicate // messages, not programming errors. Note that we ignore all old messages, // not just the most recent one, in case messages are really slow. We saw // that sometimes on the YAWS proxy. if (currentId <= _lastWriteId) { // If the id is a duplicate, then we've already received this // message. if (verboseDebugging()) { TclList logMsg; logMsg<fromServer.getBody(); TclList headers; for (PropertyList::const_iterator it = httpRequest->fromServer.getHeaders().begin(); it != httpRequest->fromServer.getHeaders().end(); it++) headers<first<second; logMsg<size; if (bodyLength < 0) { // This makes no sense. And it could cause more problems in our // processing here. TclList msg; msg<messageId; outgoingMessage.data = _notYetParsed.substr(messageStart + sizeof(Header), bodyLength); _availableToRead.push_back(outgoingMessage); messageStart += bodyLength + sizeof(Header); } if (messageStart > 0) { // At least one message was parsed. _notYetParsed.erase(0, messageStart); checkForRead(); } } void Connection::serverClosedConnection() { // The server hung up on us. Naively we'd immediately call // DeleteSocketThread::deleteSocket(). That's what // ../shared/CommandDispatcher.C does when it gets the same message // (request->atEOF()) from the InputListener object. // // We may need to keep this alive for a short time so the client can read the // last bytes that the server sent before closing the connection. And we // need to wait for a confirmation from the client that it received that // data. Or until the dead man timer goes off. Remember that the dead man // timer (and other parts of our code) uses the socket object. As soon as // we call DeleteSocketThread::deleteSocket(), the dead man timer will // immediately stop caring about this socket. _serverSideClosed = true; // Ideally we'd close() the socket here so we could immediately return some // resources to the O/S. But the rest of our libraries aren't really set up // to handle that. You can only be sure that no one is trying to use the // handle after we completely dispose of the socket object. :( checkForDelayedDisconnect(); } void Connection::checkForDelayedDisconnect() { if (_serverSideClosed && _pendingReadRequest) { // _serverSideClosed implies that we are not going to get any more data // from the server. _pendingReadRequest implies that the client has // read all data that the server has already sent, and the client has // confirmed receiving that data. Now we can finish what the server // started; we can tell the client that the server disconnected. ThreadMonitor::find().increment("delayed disconnect"); DeleteSocketThread::deleteSocket(_server); } } Connection *Connection::find(HttpRequest *request) { PropertyList getValues; request->fromServer.getGetValues(getValues); const std::string cookie = getValues["cookie"]; return find(cookie); } Connection *Connection::findByServer(SocketInfo *socket) { return getPropertyDefault(_byServerSocket, socket); } Connection *Connection::findByRead(SocketInfo *socket) { return getPropertyDefault(_byReadSocket, socket); } Connection *Connection::find(std::string cookie) { return getProperty(_byCookie, cookie, (Connection *)NULL); } void Connection::deleteAll() { while (!_byServerSocket.empty()) { delete _byServerSocket.begin()->second; } } Connection::~Connection() { removeReadRequest(); if (_server) { DeleteSocketThread::deleteSocket(_server); _byServerSocket.erase(_server); } if (!_cookie.empty()) { _byCookie.erase(_cookie); } } // The initial request might have been for a url like // http://www.trade-ideas.com/jsproxy/Connect.tcl. Previously we'd just // strip off the current script to get http://www.trade-ideas.com/jsproxy/ . // Then we could add other script names to that base. The new system will // come up with a unique name depending on the short network name of the // machine we're running on. For example it might return // http://www.trade-ideas.com/jsproxy-dice/ . By default we use the old // system. If a certain header is set, we use the new system, instead. // Presumably our reverse proxy will be setting that header. // // Test code: // I was on dice. Lighttpd on dice pointed to this program running on dice. // [phil@dice js_proxy_server]$ curl 'http://127.0.0.1/jsproxy/Connect.tcl?destination=live' // // curl --header 'X-unique-base-dir: jsproxy-' 'http://127.0.0.1/jsproxy/Connect.tcl?destination=live' // std::string Connection::getBaseDir(HttpRequest *request) { if (verboseDebugging()) { TclList logMsg; logMsg<fromServer.getHeaders(); //LogFile::primary().sendString(msg, request->getSocketInfo()); // Interesting. I sent 'X-unique-base-dir: jsproxy-' as the header using // curl. The value remained in tact, but the key changed to // "HTTP_X_UNIQUE_BASE_DIR". const std::string uniqueBaseDir = getPropertyDefault(request->fromServer.getHeaders(), "HTTP_X_UNIQUE_BASE_DIR"); if (uniqueBaseDir.empty()) // The traditional way return request->fromServer.locationBase(); else return request->fromServer.httpRootDir() + uniqueBaseDir + getShortHostName() + '/'; } Connection::Connection(HttpRequest *request, InputListener &serverListener, DeadManTimer &deadManTimer) : _deadManTimer(deadManTimer), _server(NULL), _pendingReadRequest(NULL), _lastReadId(0), _maxReadMessageSize(250000), _lastWriteId(0), _valid(false), _serverSideClosed(false) { PropertyList getValues; request->fromServer.getGetValues(getValues); const std::string destination = getValues["destination"]; if (verboseDebugging()) { TclList logMsg; logMsg<getSocketInfo()); return; } _server = new SocketInfo(host, port); if (_server->getValid() != SocketInfo::valid) { TclList logMsg; logMsg<<"Unable to open server socket" <getSocketInfo()); ThreadMonitor::find().increment(logMsg); return; } resetTimer(); // Start the timer as soon as we connect. char buffer[100]; static unsigned int randomSeed = time(NULL); static unsigned int counter = 0; sprintf(buffer, "%p_%x_%x", _server, counter, ((int)(rand_r(&randomSeed)^time(NULL)))); _cookie = buffer; counter++; TclList logMsg; logMsg<sendResponse(response); serverListener.listenToNewSocket(_server); } ///////////////////////////////////////////////////////////////////// // ProxyMainLoop ///////////////////////////////////////////////////////////////////// void ProxyMainLoop::prepairHttpResponse(HttpRequest *request, ScgiToServer &response) { // This is typically an error. However, if the client asked to disconnect, // then this is good! XmlNode responseBody; responseBody.properties["STATUS"] = "disconnected"; response._body = responseBody.asString(); } void ProxyMainLoop::onHttpRequest(HttpRequest *httpRequest) { httpRequest->callbackId = mtHttpRequest; _incoming.newRequest(httpRequest); } ProxyMainLoop::ProxyMainLoop() : ThreadClass("ProxyMainLoop"), _incoming("ProxyMainLoop"), _serverListener(&_incoming, mtServerInput) { // If we haven't heard from the client in 30 seconds, we break the // connection. That seems reasonable. In TI Pro sometimes we have to // set the timeout for the proxy to 10 seconds, so this has to be at // least 10 seconds. _deadManTimer.setDefaultTimeout(30); startThread(); } ProxyMainLoop::~ProxyMainLoop() { Request *r = new Request(NULL); r->callbackId = mtQuit; _incoming.newRequest(r); waitForThread(); Connection::deleteAll(); } static const std::string S_REMOTE_ADDR = "REMOTE_ADDR"; static const std::string S_UNKNOWN = "unknown"; void ProxyMainLoop::clientConnect(HttpRequest *httpRequest) { Connection *c = new Connection(httpRequest, _serverListener, _deadManTimer); if (c->getValid()) { const std::string remote = getProperty(httpRequest->fromServer.getHeaders(), S_REMOTE_ADDR, S_UNKNOWN); c->internalToServer("command=proxy_info&auth=auth&remote=" + urlEncode(remote) + "\r\n" // This is the only mode we support, so we set it // here. The client never sees this directly. "command=set_output&mode=simple64\r\n"); } else { delete c; // And give the default response. } } void ProxyMainLoop::clientRead(HttpRequest *httpRequest) { if (Connection *c = Connection::find(httpRequest)) { c->clientRead(httpRequest); } else delete httpRequest; } void ProxyMainLoop::clientWrite(HttpRequest *httpRequest) { if (Connection *c = Connection::find(httpRequest)) { c->clientToServer(httpRequest); sendGenericSuccessResponse(httpRequest); } } void ProxyMainLoop::clientDisconnect(HttpRequest *httpRequest) { if (Connection *c = Connection::find(httpRequest)) delete c; } void ProxyMainLoop::clientInfo(HttpRequest *httpRequest) { // This is like the http://php.net/manual/en/function.phpinfo.php command. XmlNode responseBody; for (PropertyList::const_iterator it = httpRequest->fromServer.getHeaders().begin(); it != httpRequest->fromServer.getHeaders().end(); it++) { XmlNode &p = responseBody["HEADERS"][-1]; p.name = "header"; p.properties["name"] = it->first; p.properties["value"] = it->second; } PropertyList inputs; httpRequest->fromServer.getGetValues(inputs); for (PropertyList::const_iterator it = inputs.begin(); it != inputs.end(); it++) { XmlNode &p = responseBody["GET"][-1]; p.name = "get"; p.properties["name"] = it->first; p.properties["value"] = it->second; } inputs.clear(); httpRequest->fromServer.getPostValues(inputs); for (PropertyList::const_iterator it = inputs.begin(); it != inputs.end(); it++) { XmlNode &p = responseBody["POST"][-1]; p.name = "post"; p.properties["name"] = it->first; p.properties["value"] = it->second; } std::string body = httpRequest->fromServer.getBody(); if (!body.empty()) responseBody["BODY"].text = body; ScgiToServer response; response._body = responseBody.asString(); httpRequest->sendResponse(response); } void ProxyMainLoop::threadFunction() { ThreadMonitor &tm = ThreadMonitor::find(); while (true) { while (Request *current = _incoming.getRequest()) { if (verboseDebugging()) { TclList logMsg; logMsg<callbackId; LogFile::primary().sendString(logMsg); } switch (current->callbackId) { case mtServerInput: { // The server wrote some data, or reported a related event. if (verboseDebugging()) { TclList logMsg; logMsg<callbackId; LogFile::primary().sendString(logMsg); } tm.increment("mtServerInput"); tm.setState("mtServerInput"); SocketInfo *const socket = current->getSocketInfo(); if (Connection *c = Connection::findByServer(socket)) { NewInputRequest *request = dynamic_cast(current); if (request->newSocket()) { // Meaningless for a client socket. We created this // socket ourselves, not another thread. (Normally the // input thread sends this event to the command handler // so the command handler can initialize the dead man // timer for this socket.) } else if (request->atEOF()) { // The server hung up on us. c->serverClosedConnection(); } else { // Normal data. Pass it on to the client. c->serverToClient(request->getNewInput()); if (!c->getValid()) { // The connection object is unhappy for some reason. delete c; } } } break; } case mtHttpRequest: { if (verboseDebugging()) { TclList logMsg; logMsg<callbackId; LogFile::primary().sendString(logMsg); } tm.increment("mtHttpRequest"); tm.setState("mtHttpRequest"); HttpRequest *request = dynamic_cast(current); request->defaultResponse = this; //const std::string relativePath = // request->fromServer.relativePath(); // I couldn't make relative path work on lighttpd. That seems // like a good idea, but it's not really necessary in this // program, so as a workaround I'm looking at // shortScriptName(). Note: I always used shortScriptName() // for Info.tcl to help me test more things. const std::string shortScriptName = request->fromServer.shortScriptName(); if (verboseDebugging()) { TclList logMsg; logMsg<getSocketInfo(); // Is this explicit memory barrier required? I'm not sure // but I am chasing some strange core dumps. It seems // plausible, though unlikely, that the the compiler might // move the socket = statement below oft he clientRead() // statement. clientRead() is defined in this file, so the // call might get inlined. Maybe socket = will get put // into the middle of that code. In any case if you read // that code there's nothing obvious (no use of "volatile" // or anything like that) to say to the compiler that we // can't touch the request any more. See the comments in // OneTimeTimeoutThread::add() to learn more about the // core dump that made me add this memory barrier. // // See the comments added in revision 1.21 of Messages.C. // I'm pretty sure I fixed the bug in that file. This file // was particularly good at finding that bug, but I don't // think this file has any flaws. I'm pretty sure that // memory barrier is completely unnecessary. In any case, // I didn't notice any changes after adding the memory // barrier. asm volatile ("" : : : "memory"); clientRead(request); // Note that clientRead() might delete the request object // immediately. Either way, the object is no longer our // responsibility. current = NULL; // We arbitrarily set this to 3 seconds. If we do not get // any data from the alert server in 3 seconds, we send an // empty response to the client, so it doesn't time out. // For the ax_alert_server we expect a ping or similar // at least once per second. Other servers might wait // longer and need this. TimeVal timeout(true); timeout.addMilliseconds(3000); OneTimeTimeoutRequest *timeoutRequest = new OneTimeTimeoutRequest(socket, timeout, mtReadTimeout, &_incoming); // The next line was frequently causing an assertion // failure. That should be fixed now. We are now copying // the socket info from the original request *before* // deleting that request. OneTimeTimeoutThread::getInstance().submit(timeoutRequest); } else if (shortScriptName == "Write.tcl") { tm.increment("/Write.tcl"); tm.setState("/Write.tcl"); clientWrite(request); } else if (shortScriptName == "Disconnect.tcl") { tm.increment("/Disconnect.tcl"); tm.setState("/Disconnect.tcl"); clientDisconnect(request); } else if (shortScriptName == "Info.tcl") { tm.increment("Info.tcl"); tm.setState("Info.tcl"); clientInfo(request); } else { if (verboseDebugging()) { TclList logMsg; logMsg<callbackId; LogFile::primary().sendString(logMsg); } tm.increment("404"); tm.setState("404"); request->send404Response(); } break; } case mtQuit : { if (verboseDebugging()) { TclList logMsg; logMsg<callbackId; LogFile::primary().sendString(logMsg); } tm.increment("mtQuit"); delete current; return; } case mtReadTimeout: { if (verboseDebugging()) { TclList logMsg; logMsg<callbackId; LogFile::primary().sendString(logMsg); } tm.increment("mtReadTimeout"); tm.setState("mtReadTimeout"); if (Connection *c = Connection::findByRead(current->getSocketInfo())) // A read request has sat around for too long. c->readTimeout(); // else the read request finished before the timeout came. break; } case DeleteSocketThread::callbackId: { if (verboseDebugging()) { TclList logMsg; logMsg<callbackId; LogFile::primary().sendString(logMsg); } tm.increment("DeleteSocketThread::callbackId"); tm.setState("DeleteSocketThread::callbackId"); if (Connection *c = Connection::findByServer(current->getSocketInfo())) { // We have agreed that it is time for the final cleanup for this // socket object. Note that there might be a delay between when // the server first closes the connection and when we get here. // See serverClosedConnection() for more details. Also notice // our use of the dead man timer. That's another way we could // get here, and it is essential to avoid memory leaks. (And there // are other ways that this socket could have been closed.) delete c; } else if (Connection *c = Connection::findByRead(current->getSocketInfo())) { // A read request is being held by the object. That // request was canceled. Clean up our end, and act // like the request never came in. c->removeReadRequest(); } break; } } tm.setState("Working"); delete current; } _incoming.waitForRequest(); } }