#include #include #include #include #include #include #include #include #include "MiscSupport.h" #include "ZLibMalloc.h" #include "SimpleLogFile.h" #include "ThreadMonitor.h" #include "TalkWithServer.h" ///////////////////////////////////////////////////////////////////// // TcpIpConnection ///////////////////////////////////////////////////////////////////// TcpIpConnection::TcpIpConnection(std::string const &name) : _name(name), _handle(-1), _status(NotYetOpen), _writeSucceeded(false) { } TcpIpConnection::~TcpIpConnection() { close(); } void TcpIpConnection::connect(std::string address, std::string port, bool asyncConnect) { assert(_status == NotYetOpen); // Protocol #6 is TCP. This comes from /etc/protocols. _handle = socket(AF_INET, SOCK_STREAM, 6); if (_handle == -1) { // An error occurred. This should never happen. All we've done so // far is update the address family and protocol. We haven't tried // to connect to or even name the remote server yet. std::string errorMessage = errorString(); TclList msg; msg<ai_addr, addresses->ai_addrlen); freeaddrinfo(addresses); if ((connectResult != 0) && !(asyncConnect && (errno == EINPROGRESS))) { // An error occurred. // EINPROGRESS is not an error. That's what we expect when we try to // connect asynchronously. TclList msg; msg< "<= 25) // We don't want to be doing this for too long. Just take a break and // we'll come back soon. break; // Try to read more. } switch (successfulReadCount) { case 0: if (toSend.empty()) // This is bad. We were woken up, but there is no work to do. It might // be unavoidable sometimes, but it shouldn't happen a lot. tm.increment(s_read_0_no_write); else // There was nothing to read, but there was something to write. This // happens a lot. For simplicity we have just one wakeup call shared // by the readable and writable events. tm.increment(s_read_0); break; case 1: // This should be a common case. We woke up. We read something. It all // fit in our buffer. tm.increment(s_read_1); break; default: // This is not an error. I suspect it's not common, but we won't know // until we try this. The amount of data available for reading was bigger // than our buffer, so we had to do more than one read. tm.increment(s_read + ntoa(successfulReadCount)); break; } if (!toSend.empty()) { ssize_t result = write(_handle,toSend.data(), toSend.size()); //std::cout<<"write() -> "<::iterator it = _atServer.begin(); it != _atServer.end(); it++) { LocalWrapper localWrapper; localWrapper.listener = it->second.listener; localWrapper.cancelId = it->second.cancelId; localWrapper.success = false; localWrapper.streaming = it->second.streaming; localWrapper.clientId = it->second.clientId; _localQueue.push_back(localWrapper); } _atServer.clear(); } } bool TalkWithServer::disconnected() { return _status == Closed; } void TalkWithServer::cancel(CancelId id) { _validResponses.erase(id); } void TalkWithServer::cancelAll() { _validResponses.clear(); } int TalkWithServer::pendingResponseCount() { return _localQueue.size(); } void TalkWithServer::doResponses() { // It is possible that during this one of these callbacks, new callbacks // will be added to the list. It's is unlikely that a valid response will // appear in the list, but it is certaily possible for someone to disconnect // the session in a callback, so the list will have new error responses. // It is also possible for a request to be canceled while in a callback. // // Strange. The comment above makes sense, but the code below uses iterators // to walk through the list. An iterator could be invalidated if the list // grows. I wonder why I made that comment but didn't do anything about it. // TODO for (std::vector< LocalWrapper >::iterator it = _localQueue.begin(); it != _localQueue.end(); it++) { //TclList msg; //msg<<"doResponses"<cancelId<clientId; //if (it->success) //msg<body; //std::cout<<(std::string)msg<cancelId)) { // Not yet canceled. //std::cout<<"doResponses NOT CANCELED :)"<success) it->listener->onMessage(it->body, it->clientId, it->cancelId); else it->listener->onAbort(it->clientId, it->cancelId); if (!it->streaming) // Clean up. No real functionality here, but without this // we might have a memory leak. _validResponses.erase(it->cancelId); } } _localQueue.clear(); } void TalkWithServer::sendMessage(Message const &message) { _outgoing.push_back(message); } TalkWithServer::CancelId TalkWithServer::sendMessage(Message message, IMessageListener *listener, int clientId, bool streaming) { assert(listener); int messageId = 1; while (_atServer.count(messageId)) messageId++; message["message_id"] = ntoa(messageId); ServerWrapper serverWrapper; serverWrapper.listener = listener; serverWrapper.cancelId = getNextCancelId(); _validResponses.insert(serverWrapper.cancelId); serverWrapper.streaming = streaming; serverWrapper.clientId = clientId; _atServer[messageId] = serverWrapper; sendMessage(message); return serverWrapper.cancelId; } bool TalkWithServer::wantsRead() { return _tcpIpConnection.wantsRead(); } bool TalkWithServer::wantsWrite() { return _tcpIpConnection.wantsWrite(); } int TalkWithServer::getHandle() { return _tcpIpConnection.getHandle(); } void TalkWithServer::addStringWithSize(std::string toCompress) { addInt32(toCompress.size()); _zLibOutputStream.add(toCompress); } void TalkWithServer::addInt32(int32_t toCompress) { _zLibOutputStream.add(&toCompress, 4); } void TalkWithServer::wakeUp() { if (_tcpIpConnection.readyToSend() && !_outgoing.empty()) { // Copy outgoing messages from here to the tcp/ip connection. // Don't do this before we have to. Compression will work best if // we don't flush the buffer until the last moment. for (std::vector< Message >::const_iterator messageIt = _outgoing.begin(); messageIt != _outgoing.end(); messageIt++) { for (Message::const_iterator fieldIt = messageIt->begin(); fieldIt != messageIt->end(); fieldIt++) { addStringWithSize(fieldIt->first); addStringWithSize(fieldIt->second); } addInt32(0); } _outgoing.clear(); _zLibOutputStream.flush(); _tcpIpConnection.toSend += _zLibOutputStream.output; _zLibOutputStream.output.clear(); } if (!_zLibOutputStream.valid()) { // This should never happen! Maybe out of memory? Or a code error on // our part. TclList msg; msg<listener; // For simplicity, just copy the cancel id. We will check for it // later. It might be slightly more effecient to check for it // here, too, and delete some messages slightly sooner. localWrapper.cancelId = serverWrapper->cancelId; localWrapper.body = std::string(start + 8 + consumed, size); localWrapper.success = true; localWrapper.streaming = serverWrapper->streaming; localWrapper.clientId = serverWrapper->clientId; _localQueue.push_back(localWrapper); if (!serverWrapper->streaming) _atServer.erase(id); //std::cout<<"cancelId="<