#include "../shared/XmlSupport.h" #include "../shared/ReplyToClient.h" #include "../shared/TclUtil.h" #include "../shared/CurrentRequest.h" #include "ScriptDispatcher.h" #include "TclReplyToClient.h" ///////////////////////////////////////////////////////////////////// // TclReplyToClient ///////////////////////////////////////////////////////////////////// int TclReplyToClient::convertFromTcl(Tcl_Interp *interp, ChannelInfo &destination, Tcl_Obj *source) { const std::string channelDescription = getString(source); return convertFromTcl(interp, destination, channelDescription); } int TclReplyToClient::convertFromTcl(Tcl_Interp *interp, ChannelInfo &destination, std::string const &source) { std::vector< int64_t > pieces; _serverSigner.decodeInts(source, pieces); if (pieces.size() != 2) { if (interp) Tcl_SetObjResult(interp, makeTclString("Invalid channel: " + source)); return TCL_ERROR; } destination.socketId = pieces[0]; destination.messageId = ExternalRequest::MessageId(pieces[1]); return TCL_OK; } int TclReplyToClient::getOutputChannelCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]) { return ((TclReplyToClient *)clientData) ->getOutputChannelCmd(interp, objc, objv); } int TclReplyToClient::getOutputChannelCmd(Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]) { assert(inMyThread()); ExternalRequest *const externalRequest = dynamic_cast< ExternalRequest * >(CurrentRequest::get()); bool failOnNull; switch (objc) { case 1: failOnNull = true; break; case 2: failOnNull = false; break; default: Tcl_WrongNumArgs(interp, 1, objv, "?default?"); return TCL_ERROR; } if (!externalRequest) { // Can't find the original request. Responses are sent to that. if (failOnNull) { // Throw a TCL exception. Tcl_SetObjResult(interp, makeTclString("Can't find output channel. Invalid context.")); return TCL_ERROR; } else { // Return a default value specified on the command line. Tcl_SetObjResult(interp, objv[1]); return TCL_OK; } } else { // We found the initial request. if (externalRequest->getResponseMessageId().isEmpty()) { // The client wasn't expecting a reply. if (failOnNull) { // Throw a TCL exception. Tcl_SetObjResult(interp, makeTclString("Can't find output channel. Client is not expecting a response.")); return TCL_ERROR; } else { // Return a default value specified on the command line. Tcl_SetObjResult(interp, objv[1]); return TCL_OK; } } initializeSocket(externalRequest->getSocketInfo()); std::vector< int64_t > toEncode; toEncode.reserve(2); toEncode.push_back(SocketInfo::getSerialNumber(externalRequest->getSocketInfo())); toEncode.push_back(externalRequest->getResponseMessageId().getValue()); const std::string channelDescription = _serverSigner.encodeInts(toEncode); Tcl_SetObjResult(interp, makeTclString(channelDescription)); return TCL_OK; } } int TclReplyToClient::replyToClientCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]) { return ((TclReplyToClient *)clientData) ->replyToClientCmd(interp, objc, objv); } int TclReplyToClient::replyToClientCmd(Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]) { bool sendHeaders; switch (objc) { case 3: sendHeaders = false; break; case 4: sendHeaders = true; break; default: Tcl_WrongNumArgs(interp, 1, objv, "channel body ?headers?"); return TCL_ERROR; } ChannelInfo channelInfo; if (convertFromTcl(interp, channelInfo, objv[1]) != TCL_OK) return TCL_ERROR; const std::string body = getString(objv[2]); // body std::string stringToSend; if (sendHeaders) { // Create a simple XML document. body becomes the body of the main tag. // The headers become attributes of the main tag. XmlNode toSend; toSend.text = body; Tcl_Obj *headers = objv[3]; Tcl_DictSearch search; Tcl_Obj *key, *value; int done; if (Tcl_DictObjFirst(interp, headers, &search, &key, &value, &done) != TCL_OK) return TCL_ERROR; for (; !done ; Tcl_DictObjNext(&search, &key, &value, &done)) toSend.properties[getString(key)] = getString(value); Tcl_DictObjDone(&search); stringToSend = toSend.asString(); } else // No formatting. Just send the body as is. stringToSend = body; if (inMyThread()) { // Do the work immediately so we can send any error messages to the client. if (SocketInfo *socket = getPropertyDefault(_canSendResponse, channelInfo.socketId)) { addToOutputQueue(socket, stringToSend, channelInfo.messageId); return TCL_OK; } else { Tcl_SetObjResult(interp, makeTclString("Socket is closed.")); return TCL_ERROR; } } else { // Do the work in the right thread. We need to switch threads so the // _canSendResponse variable is accessible. The alternative would be to // protect this variable with a mutex. Our style is to use mutexes only as // a last resort, and to switch threads instead most of the time. doWorkInThread([=] { if (SocketInfo *socket = getPropertyDefault(_canSendResponse, channelInfo.socketId)) addToOutputQueue(socket, stringToSend, channelInfo.messageId); }); return TCL_OK; } } // The following never really worked well. It does what it is supposed to, // but when we try to use it we run into other problems. For example, I // wanted this to be simple enough to work with nc but as soon as nc runs // out of input it closes half of the socket. Our software sees that and // completely closes the socket before any answers can be returned. int TclReplyToClient::closeWhenOutputQueueIsEmptyCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]) { return ((TclReplyToClient *)clientData) ->closeWhenOutputQueueIsEmptyCmd(interp, objc, objv); } int TclReplyToClient::closeWhenOutputQueueIsEmptyCmd(Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]) { if (objc != 2) { Tcl_WrongNumArgs(interp, 1, objv, "channel"); return TCL_ERROR; } ChannelInfo channelInfo; if (convertFromTcl(interp, channelInfo, objv[1]) != TCL_OK) return TCL_ERROR; invokeIfRequired([=] { if (SocketInfo *socket = getPropertyDefault(_canSendResponse, channelInfo.socketId)) closeWhenOutputQueueIsEmpty(socket); }); return TCL_OK; } int TclReplyToClient::touchConnectionsCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]) { return ((TclReplyToClient *)clientData) ->touchConnectionsCmd(interp, objc, objv); } int TclReplyToClient::touchConnectionsCmd(Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]) { if (objc != 2) { Tcl_WrongNumArgs(interp, 1, objv, "channel_list"); return TCL_ERROR; } std::set< uint64_t > socketIdSet; if (tclListForEach(interp, objv[1], [&](Tcl_Obj *item) { ChannelInfo channelInfo; if (convertFromTcl(interp, channelInfo, item) != TCL_OK) throw BreakAndFail(); socketIdSet.insert(channelInfo.socketId); }) != TCL_OK) return TCL_ERROR; if (!socketIdSet.empty()) { invokeIfRequired([=] { DeadManTimer &deadManTimer = CommandDispatcher::getInstance()->getDeadManTimer(); for (auto it = socketIdSet.begin(); it != socketIdSet.end(); it++) if (SocketInfo *socket = getPropertyDefault(_canSendResponse, *it)) deadManTimer.touchConnection(socket); }); } return TCL_OK; } int TclReplyToClient::isChannelValidCmd(ClientData clientData, Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]) { return ((TclReplyToClient *)clientData) ->isChannelValidCmd(interp, objc, objv); } int TclReplyToClient::isChannelValidCmd(Tcl_Interp *interp, int objc, Tcl_Obj *const objv[]) { assert(inMyThread()); if (objc != 2) { Tcl_WrongNumArgs(interp, 1, objv, "channel"); return TCL_ERROR; } bool result; ChannelInfo channelInfo; if (convertFromTcl(NULL, channelInfo, objv[1]) != TCL_OK) { // Not a valid encoding. Maybe a random string, or a value from // before we rebooted, or a person trying to guess a valid value. result = false; } else if (!_canSendResponse.count(channelInfo.socketId)) { // Socket closed. result = false; } else if (channelInfo.messageId.isEmpty()) { // The socket is open, but the client didn't ask for a response. result = false; } else // It makes sense to talk with this channel. result = true; Tcl_SetObjResult(interp, Tcl_NewBooleanObj(result)); return TCL_OK; } // IContainerThreadUser void TclReplyToClient::socketClosed(SocketInfo *socket) { _canSendResponse.erase(SocketInfo::getSerialNumber(socket)); } void TclReplyToClient::initializeSocket(SocketInfo *socket) { assert(inMyThread()); _canSendResponse[SocketInfo::getSerialNumber(socket)] = socket; } void TclReplyToClient::addGetCurrentChannel(Tcl_Interp *interp) { Tcl_CreateObjCommand(interp, "ti::get_current_channel", getOutputChannelCmd, (ClientData)this, NULL); Tcl_CreateObjCommand(interp, "ti::is_channel_valid", isChannelValidCmd, (ClientData)this, NULL); } void TclReplyToClient::addOtherCommands(Tcl_Interp *interp) { Tcl_CreateObjCommand(interp, "ti::reply_to_client", replyToClientCmd, (ClientData)this, NULL); Tcl_CreateObjCommand(interp, "ti::close_when_output_queue_is_empty", closeWhenOutputQueueIsEmptyCmd, (ClientData)this, NULL); Tcl_CreateObjCommand(interp, "ti::touch_connections", touchConnectionsCmd, (ClientData)this, NULL); } TclReplyToClient::TclReplyToClient(IContainerThread *container) : ForeverThreadUser(container) { start(); } TclReplyToClient &TclReplyToClient::instance() { static TclReplyToClient *i = new TclReplyToClient(ScriptDispatcher::getPrimary()->getContainer()); return *i; }