/***************************************************************************** $Id$ File: pipe.cpp Date: 30May07 Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. Gmail: blackhedd This program is free software; you can redistribute it and/or modify it under the terms of either: 1) the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version; or 2) Ruby's License. See the file COPYING for complete licensing information. *****************************************************************************/ #include "project.h" #ifdef OS_UNIX // THIS ENTIRE FILE IS ONLY COMPILED ON UNIX-LIKE SYSTEMS. /****************************** PipeDescriptor::PipeDescriptor ******************************/ PipeDescriptor::PipeDescriptor (int fd, pid_t subpid, EventMachine_t *parent_em): EventableDescriptor (fd, parent_em), bReadAttemptedAfterClose (false), OutboundDataSize (0), SubprocessPid (subpid) { #ifdef HAVE_EPOLL EpollEvent.events = EPOLLIN; #endif #ifdef HAVE_KQUEUE MyEventMachine->ArmKqueueReader (this); #endif } /******************************* PipeDescriptor::~PipeDescriptor *******************************/ PipeDescriptor::~PipeDescriptor() { // Run down any stranded outbound data. for (size_t i=0; i < OutboundPages.size(); i++) OutboundPages[i].Free(); /* As a virtual destructor, we come here before the base-class * destructor that closes our file-descriptor. * We have to make sure the subprocess goes down (if it's not * already down) and we have to reap the zombie. * * This implementation is PROVISIONAL and will surely be improved. * The intention here is that we never block, hence the highly * undesirable sleeps. But if we can't reap the subprocess even * after sending it SIGKILL, then something is wrong and we * throw a fatal exception, which is also not something we should * be doing. * * Eventually the right thing to do will be to have the reactor * core respond to SIGCHLD by chaining a handler on top of the * one Ruby may have installed, and dealing with a list of dead * children that are pending cleanup. * * Since we want to have a signal processor integrated into the * client-visible API, let's wait until that is done before cleaning * this up. * * Added a very ugly hack to support passing the subprocess's exit * status to the user. It only makes logical sense for user code to access * the subprocess exit status in the unbind callback. But unbind is called * back during the EventableDescriptor destructor. So by that time there's * no way to call back this object through an object binding, because it's * already been cleaned up. We might have added a parameter to the unbind * callback, but that would probably break a huge amount of existing code. * So the hack-solution is to define an instance variable in the EventMachine * object and stick the exit status in there, where it can easily be accessed * with an accessor visible to user code. * User code should ONLY access the exit status from within the unbind callback. * Otherwise there's no guarantee it'll be valid. * This hack won't make it impossible to run multiple EventMachines in a single * process, but it will make it impossible to reliably nest unbind calls * within other unbind calls. (Not sure if that's even possible.) */ assert (MyEventMachine); /* Another hack to make the SubprocessPid available to get_subprocess_status */ MyEventMachine->SubprocessPid = SubprocessPid; /* 01Mar09: Updated to use a small nanosleep in a loop. When nanosleep is interrupted by SIGCHLD, * it resumes the system call after processing the signal (resulting in unnecessary latency). * Calling nanosleep in a loop avoids this problem. */ struct timespec req = {0, 50000000}; // 0.05s int n; // wait 0.5s for the process to die for (n=0; n<10; n++) { if (waitpid (SubprocessPid, &(MyEventMachine->SubprocessExitStatus), WNOHANG) != 0) return; nanosleep (&req, NULL); } // send SIGTERM and wait another 1s kill (SubprocessPid, SIGTERM); for (n=0; n<20; n++) { nanosleep (&req, NULL); if (waitpid (SubprocessPid, &(MyEventMachine->SubprocessExitStatus), WNOHANG) != 0) return; } // send SIGKILL and wait another 5s kill (SubprocessPid, SIGKILL); for (n=0; n<100; n++) { nanosleep (&req, NULL); if (waitpid (SubprocessPid, &(MyEventMachine->SubprocessExitStatus), WNOHANG) != 0) return; } // still not dead, give up! throw std::runtime_error ("unable to reap subprocess"); } /******************** PipeDescriptor::Read ********************/ void PipeDescriptor::Read() { int sd = GetSocket(); if (sd == INVALID_SOCKET) { assert (!bReadAttemptedAfterClose); bReadAttemptedAfterClose = true; return; } LastActivity = MyEventMachine->GetCurrentLoopTime(); int total_bytes_read = 0; char readbuffer [16 * 1024]; for (int i=0; i < 10; i++) { // Don't read just one buffer and then move on. This is faster // if there is a lot of incoming. // But don't read indefinitely. Give other sockets a chance to run. // NOTICE, we're reading one less than the buffer size. // That's so we can put a guard byte at the end of what we send // to user code. // Use read instead of recv, which on Linux gives a "socket operation // on nonsocket" error. int r = read (sd, readbuffer, sizeof(readbuffer) - 1); //cerr << ""; if (r > 0) { total_bytes_read += r; // Add a null-terminator at the the end of the buffer // that we will send to the callback. // DO NOT EVER CHANGE THIS. We want to explicitly allow users // to be able to depend on this behavior, so they will have // the option to do some things faster. Additionally it's // a security guard against buffer overflows. readbuffer [r] = 0; _GenericInboundDispatch(readbuffer, r); } else if (r == 0) { break; } else { // Basically a would-block, meaning we've read everything there is to read. break; } } if (total_bytes_read == 0) { // If we read no data on a socket that selected readable, // it generally means the other end closed the connection gracefully. ScheduleClose (false); //bCloseNow = true; } } /********************* PipeDescriptor::Write *********************/ void PipeDescriptor::Write() { int sd = GetSocket(); assert (sd != INVALID_SOCKET); LastActivity = MyEventMachine->GetCurrentLoopTime(); char output_buffer [16 * 1024]; size_t nbytes = 0; while ((OutboundPages.size() > 0) && (nbytes < sizeof(output_buffer))) { OutboundPage *op = &(OutboundPages[0]); if ((nbytes + op->Length - op->Offset) < sizeof (output_buffer)) { memcpy (output_buffer + nbytes, op->Buffer + op->Offset, op->Length - op->Offset); nbytes += (op->Length - op->Offset); op->Free(); OutboundPages.pop_front(); } else { int len = sizeof(output_buffer) - nbytes; memcpy (output_buffer + nbytes, op->Buffer + op->Offset, len); op->Offset += len; nbytes += len; } } // We should never have gotten here if there were no data to write, // so assert that as a sanity check. // Don't bother to make sure nbytes is less than output_buffer because // if it were we probably would have crashed already. assert (nbytes > 0); assert (GetSocket() != INVALID_SOCKET); int bytes_written = write (GetSocket(), output_buffer, nbytes); #ifdef OS_WIN32 int e = WSAGetLastError(); #else int e = errno; #endif if (bytes_written > 0) { OutboundDataSize -= bytes_written; if ((size_t)bytes_written < nbytes) { int len = nbytes - bytes_written; char *buffer = (char*) malloc (len + 1); if (!buffer) throw std::runtime_error ("bad alloc throwing back data"); memcpy (buffer, output_buffer + bytes_written, len); buffer [len] = 0; OutboundPages.push_front (OutboundPage (buffer, len)); } #ifdef HAVE_EPOLL EpollEvent.events = EPOLLIN; if (SelectForWrite()) EpollEvent.events |= EPOLLOUT; assert (MyEventMachine); MyEventMachine->Modify (this); #endif } else { #ifdef OS_UNIX if ((e != EINPROGRESS) && (e != EWOULDBLOCK) && (e != EINTR)) #endif #ifdef OS_WIN32 if ((e != WSAEINPROGRESS) && (e != WSAEWOULDBLOCK)) #endif Close(); } } /************************* PipeDescriptor::Heartbeat *************************/ void PipeDescriptor::Heartbeat() { // If an inactivity timeout is defined, then check for it. if (InactivityTimeout && ((MyEventMachine->GetCurrentLoopTime() - LastActivity) >= InactivityTimeout)) ScheduleClose (false); //bCloseNow = true; } /***************************** PipeDescriptor::SelectForRead *****************************/ bool PipeDescriptor::SelectForRead() { /* Pipe descriptors, being local by definition, don't have * a pending state, so this is simpler than for the * ConnectionDescriptor object. */ return bPaused ? false : true; } /****************************** PipeDescriptor::SelectForWrite ******************************/ bool PipeDescriptor::SelectForWrite() { /* Pipe descriptors, being local by definition, don't have * a pending state, so this is simpler than for the * ConnectionDescriptor object. */ return (GetOutboundDataSize() > 0) && !bPaused ? true : false; } /******************************** PipeDescriptor::SendOutboundData ********************************/ int PipeDescriptor::SendOutboundData (const char *data, unsigned long length) { //if (bCloseNow || bCloseAfterWriting) if (IsCloseScheduled()) return 0; if (!data && (length > 0)) throw std::runtime_error ("bad outbound data"); char *buffer = (char *) malloc (length + 1); if (!buffer) throw std::runtime_error ("no allocation for outbound data"); memcpy (buffer, data, length); buffer [length] = 0; OutboundPages.push_back (OutboundPage (buffer, length)); OutboundDataSize += length; #ifdef HAVE_EPOLL EpollEvent.events = (EPOLLIN | EPOLLOUT); assert (MyEventMachine); MyEventMachine->Modify (this); #endif return length; } /******************************** PipeDescriptor::GetSubprocessPid ********************************/ bool PipeDescriptor::GetSubprocessPid (pid_t *pid) { bool ok = false; if (pid && (SubprocessPid > 0)) { *pid = SubprocessPid; ok = true; } return ok; } #endif // OS_UNIX