/***************************************************************************** $Id: ed.cpp 2291 2006-04-14 03:56:18Z francis $ File: ed.cpp Date: 06Apr06 Copyright (C) 2006 by Francis Cianfrocca. All Rights Reserved. Gmail: garbagecat20 This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *****************************************************************************/ #include "project.h" /**************************************** EventableDescriptor::EventableDescriptor ****************************************/ EventableDescriptor::EventableDescriptor (int sd): EventCallback (NULL), LastRead (0), LastWritten (0), MySocket (sd), bCloseNow (false), bCloseAfterWriting (false) { /* There are three ways to close a socket, all of which should * automatically signal to the event machine that this object * should be removed from the polling scheduler. * First is a hard close, intended for bad errors or possible * security violations. It immediately closes the connection * and puts this object into an error state. * Second is to set bCloseNow, which will cause the event machine * to delete this object (and thus close the connection in our * destructor) the next chance it gets. bCloseNow also inhibits * the writing of new data on the socket (but not necessarily * the reading of new data). * The third way is to set bCloseAfterWriting, which inhibits * the writing of new data and converts to bCloseNow as soon * as everything in the outbound queue has been written. * bCloseAfterWriting is really for use only by protocol handlers * (for example, HTTP writes an HTML page and then closes the * connection). All of the error states we generate internally * cause an immediate close to be scheduled, which may have the * effect of discarding outbound data. */ if (sd == -1) throw std::runtime_error ("bad eventable descriptor"); CreatedAt = gCurrentLoopTime; } /***************************************** EventableDescriptor::~EventableDescriptor *****************************************/ EventableDescriptor::~EventableDescriptor() { if (EventCallback) (*EventCallback)(GetBinding().c_str(), EventMachine_t::CONNECTION_UNBOUND, NULL, 0); Close(); } /************************************* EventableDescriptor::SetEventCallback *************************************/ void EventableDescriptor::SetEventCallback (void(*cb)(const char*, int, const char*, int)) { EventCallback = cb; } /************************** EventableDescriptor::Close **************************/ void EventableDescriptor::Close() { // Close the socket right now. Intended for emergencies. if (MySocket != -1) { shutdown (MySocket, 1); close (MySocket); MySocket = -1; } } /********************************* EventableDescriptor::ShouldDelete *********************************/ bool EventableDescriptor::ShouldDelete() { /* For use by a socket manager, which needs to know if this object * should be removed from scheduling events and deleted. * Has an immediate close been scheduled, or are we already closed? * If either of these are the case, return true. In theory, the manager will * then delete us, which in turn will make sure the socket is closed. * Note, if bCloseAfterWriting is true, we check a virtual method to see * if there is outbound data to write, and only request a close if there is none. */ return ((MySocket == -1) || bCloseNow || (bCloseAfterWriting && (GetOutboundDataSize() <= 0))); } /****************************************** ConnectionDescriptor::ConnectionDescriptor ******************************************/ ConnectionDescriptor::ConnectionDescriptor (int sd): EventableDescriptor (sd), bConnectPending (false), OutboundDataSize (0) { } /******************************************* ConnectionDescriptor::~ConnectionDescriptor *******************************************/ ConnectionDescriptor::~ConnectionDescriptor() { // Run down any stranded outbound data. for (size_t i=0; i < OutboundPages.size(); i++) OutboundPages[i].Free(); } /************************************************** STATIC: ConnectionDescriptor::SendDataToConnection **************************************************/ int ConnectionDescriptor::SendDataToConnection (const char *binding, const char *data, int data_length) { ConnectionDescriptor *cd = dynamic_cast (Bindable_t::GetObject (binding)); return cd ? cd->SendOutboundData (data, data_length) : -1; } /********************************************* STATIC: ConnectionDescriptor::CloseConnection *********************************************/ void ConnectionDescriptor::CloseConnection (const char *binding, bool after_writing) { ConnectionDescriptor *cd = dynamic_cast (Bindable_t::GetObject (binding)); if (cd) { if (after_writing) cd->bCloseAfterWriting = true; else cd->bCloseNow = true; } } /************************************** ConnectionDescriptor::SendOutboundData **************************************/ int ConnectionDescriptor::SendOutboundData (const char *data, int length) { // Highly naive and incomplete implementation. // There's no throttle for runaways (which should abort only this connection // and not the whole process), and no coalescing of small pages. if (bCloseNow || bCloseAfterWriting) return 0; if (!data && (length > 0)) throw std::runtime_error ("bad outbound data"); char *buffer = (char *) malloc (length + 1); if (!buffer) throw std::runtime_error ("no allocation for outbound data"); memcpy (buffer, data, length); buffer [length] = 0; OutboundPages.push_back (OutboundPage (buffer, length)); OutboundDataSize += length; return length; } /*********************************** ConnectionDescriptor::SelectForRead ***********************************/ bool ConnectionDescriptor::SelectForRead() { /* A connection descriptor is always scheduled for read, * UNLESS it's in a pending-connect state. * On Linux, unlike Unix, a nonblocking socket on which * connect has been called, does NOT necessarily select * both readable and writable in case of error. * The socket will select writable when the disposition * of the connect is known. On the other hand, a socket * which successfully connects and selects writable may * indeed have some data available on it, so it will * select readable in that case, violating expectations! * So we will not poll for readability until the socket * is known to be in a connected state. */ return bConnectPending ? false : true; } /************************************ ConnectionDescriptor::SelectForWrite ************************************/ bool ConnectionDescriptor::SelectForWrite() { /* Cf the notes under SelectForRead. * In a pending-connect state, we ALWAYS select for writable. * In a normal state, we only select for writable when we * have outgoing data to send. */ if (bConnectPending) return true; else { return (GetOutboundDataSize() > 0); } } /************************** ConnectionDescriptor::Read **************************/ void ConnectionDescriptor::Read() { /* Read and dispatch data on a socket that has selected readable. * It's theoretically possible to get and dispatch incoming data on * a socket that has already been scheduled for closing or close-after-writing. * In those cases, we'll leave it up the to protocol handler to "do the * right thing" (which probably means to ignore the incoming data). */ int sd = GetSocket(); assert (sd != -1); int total_bytes_read = 0; char readbuffer [16 * 1024]; for (int i=0; i < 10; i++) { // Don't read just one buffer and then move on. This is faster // if there is a lot of incoming. // But don't read indefinitely. Give other sockets a chance to run. int r = recv (sd, readbuffer, sizeof(readbuffer) - 1, 0); //cerr << ""; if (r > 0) { total_bytes_read += r; LastRead = gCurrentLoopTime; readbuffer [r] = 0; // Impute a null-terminator, just for convenience. if (EventCallback) (*EventCallback)(GetBinding().c_str(), EventMachine_t::CONNECTION_READ, readbuffer, r); } else if (r == 0) { break; } else { // Basically a would-block, meaning we've everything there is to read. break; } } if (total_bytes_read == 0) { // If we read no data on a socket that selected readable, // it generally means the other end closed the connection gracefully. bCloseNow = true; } } /*************************** ConnectionDescriptor::Write ***************************/ void ConnectionDescriptor::Write() { /* A socket which is in a pending-connect state will select * writable when the disposition of the connect is known. * At that point, check to be sure there are no errors, * and if none, then promote the socket out of the pending * state. */ if (bConnectPending) { int error; socklen_t len; len = sizeof(error); int o = getsockopt (MySocket, SOL_SOCKET, SO_ERROR, &error, &len); if ((o == 0) && (error == 0)) bConnectPending = false; else bCloseNow = true; } else { _WriteOutboundData(); } } /**************************************** ConnectionDescriptor::_WriteOutboundData ****************************************/ void ConnectionDescriptor::_WriteOutboundData() { /* This is a helper function called by ::Write. * It's possible for a socket to select writable and then no longer * be writable by the time we get around to writing. The kernel might * have used up its available output buffers between the select call * and when we get here. So this condition is not an error. */ int sd = GetSocket(); assert (sd != -1); char output_buffer [16 * 1024]; size_t nbytes = 0; while ((OutboundPages.size() > 0) && (nbytes < sizeof(output_buffer))) { OutboundPage *op = &(OutboundPages[0]); if ((nbytes + op->Length - op->Offset) < sizeof (output_buffer)) { memcpy (output_buffer + nbytes, op->Buffer + op->Offset, op->Length - op->Offset); nbytes += (op->Length - op->Offset); op->Free(); OutboundPages.pop_front(); } else { int len = sizeof(output_buffer) - nbytes; memcpy (output_buffer + nbytes, op->Buffer + op->Offset, len); op->Offset += len; nbytes += len; } } // We should never have gotten here if there were no data to write, // so assert that as a sanity check. // Don't bother to make sure nbytes is less than output_buffer because // if it were we probably would have crashed already. assert (nbytes > 0); assert (GetSocket() != -1); int bytes_written = send (GetSocket(), output_buffer, nbytes, 0); if (bytes_written > 0) { OutboundDataSize -= bytes_written; if ((size_t)bytes_written < nbytes) { int len = nbytes - bytes_written; char *buffer = (char*) malloc (len + 1); if (!buffer) throw std::runtime_error ("bad alloc throwing back data"); memcpy (buffer, output_buffer + bytes_written, len); buffer [len] = 0; OutboundPages.push_front (OutboundPage (buffer, len)); } } else { if ((errno != EINPROGRESS) && (errno != EWOULDBLOCK) && (errno != EINTR)) Close(); } } /******************************* ConnectionDescriptor::Heartbeat *******************************/ void ConnectionDescriptor::Heartbeat() { /* Only allow a certain amount of time to go by while waiting * for a pending connect. If it expires, then kill the socket. */ if (bConnectPending) { if ((gCurrentLoopTime - CreatedAt) >= PendingConnectTimeout) bCloseNow = true; } } /************************************** AcceptorDescriptor::AcceptorDescriptor **************************************/ AcceptorDescriptor::AcceptorDescriptor (EventMachine_t *parent_em, int sd): EventableDescriptor (sd), MyEventMachine (parent_em) { /* This is really bad and ugly. Change someday if possible. * We have to know about an event-machine (probably the one that owns us), * so we can pass newly-created connections to it. */ if (!MyEventMachine) throw std::runtime_error ("bad event-machine passed to acceptor"); } /*************************************** AcceptorDescriptor::~AcceptorDescriptor ***************************************/ AcceptorDescriptor::~AcceptorDescriptor() { } /************************ AcceptorDescriptor::Read ************************/ void AcceptorDescriptor::Read() { /* Accept up to a certain number of sockets on the listening connection. * Don't try to accept all that are present, because this would allow a DoS attack * in which no data were ever read or written. We should accept more than one, * if available, to keep the partially accepted sockets from backing up in the kernel. */ /* Make sure we use non-blocking i/o on the acceptor socket, since we're selecting it * for readability. According to Stevens UNP, it's possible for an acceptor to select readable * and then block when we call accept. For example, the other end resets the connection after * the socket selects readable and before we call accept. The kernel will remove the dead * socket from the accept queue. If the accept queue is now empty, accept will block. */ struct sockaddr_in pin; socklen_t addrlen = sizeof (pin); for (int i=0; i < 10; i++) { int sd = accept (GetSocket(), (struct sockaddr*)&pin, &addrlen); if (sd == -1) { // This breaks the loop when we've accepted everything on the kernel queue, // up to 10 new connections. But what if the *first* accept fails? // Does that mean anything serious is happening, beyond the situation // described in the note above? break; } // Set the newly-accepted socket non-blocking. // On Windows, this may fail because, weirdly, Windows inherits the non-blocking // attribute that we applied to the acceptor socket into the accepted one. int val = fcntl (sd, F_GETFL, 0); if (fcntl (sd, F_SETFL, val | O_NONBLOCK) == -1) { shutdown (sd, 1); close (sd); continue; } // Disable slow-start (Nagle algorithm). Eventually make this configurable. int one = 1; setsockopt (sd, IPPROTO_TCP, TCP_NODELAY, (const char*) &one, sizeof(one)); ConnectionDescriptor *cd = new ConnectionDescriptor (sd); if (!cd) throw std::runtime_error ("no newly accepted connection"); if (EventCallback) { (*EventCallback) (GetBinding().c_str(), EventMachine_t::CONNECTION_ACCEPTED, cd->GetBinding().c_str(), cd->GetBinding().size()); } assert (MyEventMachine); MyEventMachine->Add (cd); } } /************************* AcceptorDescriptor::Write *************************/ void AcceptorDescriptor::Write() { // Why are we here? throw std::runtime_error ("bad code path in acceptor"); } /***************************** AcceptorDescriptor::Heartbeat *****************************/ void AcceptorDescriptor::Heartbeat() { // No-op }