00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef _PASSENGER_APPLICATION_POOL_SERVER_H_
00021 #define _PASSENGER_APPLICATION_POOL_SERVER_H_
00022
00023 #include <boost/shared_ptr.hpp>
00024 #include <boost/thread/mutex.hpp>
00025 #include <oxt/system_calls.hpp>
00026 #include <oxt/backtrace.hpp>
00027
00028 #include <sys/types.h>
00029 #include <sys/stat.h>
00030 #include <sys/wait.h>
00031 #include <sys/socket.h>
00032 #include <cstdio>
00033 #include <cstdlib>
00034 #include <limits.h>
00035 #include <errno.h>
00036 #include <unistd.h>
00037 #include <signal.h>
00038 #include <fcntl.h>
00039
00040 #include "MessageChannel.h"
00041 #include "ApplicationPool.h"
00042 #include "Application.h"
00043 #include "Exceptions.h"
00044 #include "Logging.h"
00045
00046 namespace Passenger {
00047
00048 using namespace std;
00049 using namespace boost;
00050 using namespace oxt;
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132 class ApplicationPoolServer {
00133 private:
00134
00135
00136
00137
00138
00139
00140
00141 struct SharedData {
00142
00143
00144
00145
00146
00147
00148
00149 int server;
00150
00151 mutable boost::mutex lock;
00152
00153 ~SharedData() {
00154 TRACE_POINT();
00155 if (server != -1) {
00156 disconnect();
00157 }
00158 }
00159
00160
00161
00162
00163 void disconnect() {
00164 TRACE_POINT();
00165 int ret;
00166 do {
00167 ret = close(server);
00168 } while (ret == -1 && errno == EINTR);
00169 server = -1;
00170 }
00171 };
00172
00173 typedef shared_ptr<SharedData> SharedDataPtr;
00174
00175
00176
00177
00178 class RemoteSession: public Application::Session {
00179 private:
00180 SharedDataPtr data;
00181 int id;
00182 int fd;
00183 pid_t pid;
00184 public:
00185 RemoteSession(SharedDataPtr data, pid_t pid, int id, int fd) {
00186 this->data = data;
00187 this->pid = pid;
00188 this->id = id;
00189 this->fd = fd;
00190 }
00191
00192 virtual ~RemoteSession() {
00193 closeStream();
00194 boost::mutex::scoped_lock(data->lock);
00195 MessageChannel(data->server).write("close", toString(id).c_str(), NULL);
00196 }
00197
00198 virtual int getStream() const {
00199 return fd;
00200 }
00201
00202 virtual void setReaderTimeout(unsigned int msec) {
00203 MessageChannel(fd).setReadTimeout(msec);
00204 }
00205
00206 virtual void setWriterTimeout(unsigned int msec) {
00207 MessageChannel(fd).setWriteTimeout(msec);
00208 }
00209
00210 virtual void shutdownReader() {
00211 if (fd != -1) {
00212 int ret = syscalls::shutdown(fd, SHUT_RD);
00213 if (ret == -1) {
00214 throw SystemException("Cannot shutdown the writer stream",
00215 errno);
00216 }
00217 }
00218 }
00219
00220 virtual void shutdownWriter() {
00221 if (fd != -1) {
00222 int ret = syscalls::shutdown(fd, SHUT_WR);
00223 if (ret == -1) {
00224 throw SystemException("Cannot shutdown the writer stream",
00225 errno);
00226 }
00227 }
00228 }
00229
00230 virtual void closeStream() {
00231 if (fd != -1) {
00232 int ret = syscalls::close(fd);
00233 if (ret == -1) {
00234 throw SystemException("Cannot close the session stream",
00235 errno);
00236 }
00237 fd = -1;
00238 }
00239 }
00240
00241 virtual void discardStream() {
00242 fd = -1;
00243 }
00244
00245 virtual pid_t getPid() const {
00246 return pid;
00247 }
00248 };
00249
00250
00251
00252
00253
00254
00255 class Client: public ApplicationPool {
00256 private:
00257
00258
00259 SharedDataPtr dataSmartPointer;
00260 SharedData *data;
00261
00262 public:
00263
00264
00265
00266
00267
00268 Client(int sock) {
00269 dataSmartPointer = ptr(new SharedData());
00270 data = dataSmartPointer.get();
00271 data->server = sock;
00272 }
00273
00274 virtual bool connected() const {
00275 boost::mutex::scoped_lock(data->lock);
00276 return data->server != -1;
00277 }
00278
00279 virtual void clear() {
00280 MessageChannel channel(data->server);
00281 boost::mutex::scoped_lock l(data->lock);
00282 try {
00283 channel.write("clear", NULL);
00284 } catch (...) {
00285 data->disconnect();
00286 throw;
00287 }
00288 }
00289
00290 virtual void setMaxIdleTime(unsigned int seconds) {
00291 MessageChannel channel(data->server);
00292 boost::mutex::scoped_lock l(data->lock);
00293 try {
00294 channel.write("setMaxIdleTime", toString(seconds).c_str(), NULL);
00295 } catch (...) {
00296 data->disconnect();
00297 throw;
00298 }
00299 }
00300
00301 virtual void setMax(unsigned int max) {
00302 MessageChannel channel(data->server);
00303 boost::mutex::scoped_lock l(data->lock);
00304 try {
00305 channel.write("setMax", toString(max).c_str(), NULL);
00306 } catch (...) {
00307 data->disconnect();
00308 throw;
00309 }
00310 }
00311
00312 virtual unsigned int getActive() const {
00313 MessageChannel channel(data->server);
00314 boost::mutex::scoped_lock l(data->lock);
00315 vector<string> args;
00316
00317 try {
00318 channel.write("getActive", NULL);
00319 channel.read(args);
00320 return atoi(args[0].c_str());
00321 } catch (...) {
00322 data->disconnect();
00323 throw;
00324 }
00325 }
00326
00327 virtual unsigned int getCount() const {
00328 MessageChannel channel(data->server);
00329 boost::mutex::scoped_lock l(data->lock);
00330 vector<string> args;
00331
00332 try {
00333 channel.write("getCount", NULL);
00334 channel.read(args);
00335 return atoi(args[0].c_str());
00336 } catch (...) {
00337 data->disconnect();
00338 throw;
00339 }
00340 }
00341
00342 virtual void setMaxPerApp(unsigned int max) {
00343 MessageChannel channel(data->server);
00344 boost::mutex::scoped_lock l(data->lock);
00345 try {
00346 channel.write("setMaxPerApp", toString(max).c_str(), NULL);
00347 } catch (...) {
00348 data->disconnect();
00349 throw;
00350 }
00351 }
00352
00353 virtual pid_t getSpawnServerPid() const {
00354 this_thread::disable_syscall_interruption dsi;
00355 MessageChannel channel(data->server);
00356 boost::mutex::scoped_lock l(data->lock);
00357 vector<string> args;
00358
00359 try {
00360 channel.write("getSpawnServerPid", NULL);
00361 channel.read(args);
00362 return atoi(args[0].c_str());
00363 } catch (...) {
00364 data->disconnect();
00365 throw;
00366 }
00367 }
00368
00369 virtual Application::SessionPtr get(const PoolOptions &options) {
00370 this_thread::disable_syscall_interruption dsi;
00371 TRACE_POINT();
00372
00373 MessageChannel channel(data->server);
00374 boost::mutex::scoped_lock l(data->lock);
00375 vector<string> args;
00376 int stream;
00377 bool result;
00378
00379 try {
00380 vector<string> args;
00381
00382 args.push_back("get");
00383 options.toVector(args);
00384 channel.write(args);
00385 } catch (const SystemException &e) {
00386 UPDATE_TRACE_POINT();
00387 data->disconnect();
00388
00389 string message("Could not send data to the ApplicationPool server: ");
00390 message.append(e.brief());
00391 throw SystemException(message, e.code());
00392 }
00393 try {
00394 UPDATE_TRACE_POINT();
00395 result = channel.read(args);
00396 } catch (const SystemException &e) {
00397 UPDATE_TRACE_POINT();
00398 data->disconnect();
00399 throw SystemException("Could not read a message from "
00400 "the ApplicationPool server", e.code());
00401 }
00402 if (!result) {
00403 UPDATE_TRACE_POINT();
00404 data->disconnect();
00405 throw IOException("The ApplicationPool server unexpectedly "
00406 "closed the connection.");
00407 }
00408 if (args[0] == "ok") {
00409 UPDATE_TRACE_POINT();
00410 pid_t pid = (pid_t) atol(args[1]);
00411 int sessionID = atoi(args[2]);
00412
00413 try {
00414 stream = channel.readFileDescriptor();
00415 } catch (...) {
00416 UPDATE_TRACE_POINT();
00417 data->disconnect();
00418 throw;
00419 }
00420
00421 return ptr(new RemoteSession(dataSmartPointer,
00422 pid, sessionID, stream));
00423 } else if (args[0] == "SpawnException") {
00424 UPDATE_TRACE_POINT();
00425 if (args[2] == "true") {
00426 string errorPage;
00427
00428 try {
00429 result = channel.readScalar(errorPage);
00430 } catch (...) {
00431 data->disconnect();
00432 throw;
00433 }
00434 if (!result) {
00435 throw IOException("The ApplicationPool server "
00436 "unexpectedly closed the connection.");
00437 }
00438 throw SpawnException(args[1], errorPage);
00439 } else {
00440 throw SpawnException(args[1]);
00441 }
00442 } else if (args[0] == "BusyException") {
00443 UPDATE_TRACE_POINT();
00444 throw BusyException(args[1]);
00445 } else if (args[0] == "IOException") {
00446 UPDATE_TRACE_POINT();
00447 data->disconnect();
00448 throw IOException(args[1]);
00449 } else {
00450 UPDATE_TRACE_POINT();
00451 data->disconnect();
00452 throw IOException("The ApplicationPool server returned "
00453 "an unknown message: " + toString(args));
00454 }
00455 }
00456 };
00457
00458
00459 static const int SERVER_SOCKET_FD = 3;
00460
00461 string m_serverExecutable;
00462 string m_spawnServerCommand;
00463 string m_logFile;
00464 string m_rubyCommand;
00465 string m_user;
00466 string statusReportFIFO;
00467
00468
00469
00470
00471
00472
00473
00474
00475
00476 pid_t serverPid;
00477
00478
00479
00480
00481
00482
00483
00484
00485
00486 int serverSocket;
00487
00488
00489
00490
00491
00492
00493
00494
00495 void shutdownServer() {
00496 TRACE_POINT();
00497 this_thread::disable_syscall_interruption dsi;
00498 int ret;
00499 time_t begin;
00500 bool done = false;
00501
00502 syscalls::close(serverSocket);
00503 if (!statusReportFIFO.empty()) {
00504 do {
00505 ret = unlink(statusReportFIFO.c_str());
00506 } while (ret == -1 && errno == EINTR);
00507 }
00508
00509 P_TRACE(2, "Waiting for existing ApplicationPoolServerExecutable (PID " <<
00510 serverPid << ") to exit...");
00511 begin = syscalls::time(NULL);
00512 while (!done && syscalls::time(NULL) < begin + 5) {
00513
00514
00515
00516
00517
00518
00519 syscalls::kill(serverPid, SIGINT);
00520
00521 ret = syscalls::waitpid(serverPid, NULL, WNOHANG);
00522 done = ret > 0 || ret == -1;
00523 if (!done) {
00524 syscalls::usleep(100000);
00525 }
00526 }
00527 if (done) {
00528 P_TRACE(2, "ApplicationPoolServerExecutable exited.");
00529 } else {
00530 P_DEBUG("ApplicationPoolServerExecutable not exited in time. Killing it...");
00531 syscalls::kill(serverPid, SIGTERM);
00532 syscalls::waitpid(serverPid, NULL, 0);
00533 }
00534
00535 serverSocket = -1;
00536 serverPid = 0;
00537 }
00538
00539
00540
00541
00542
00543
00544
00545
00546
00547 void restartServer() {
00548 TRACE_POINT();
00549 int fds[2];
00550 pid_t pid;
00551
00552 if (serverPid != 0) {
00553 shutdownServer();
00554 }
00555
00556 if (syscalls::socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) {
00557 throw SystemException("Cannot create a Unix socket pair", errno);
00558 }
00559
00560 createStatusReportFIFO();
00561
00562 pid = syscalls::fork();
00563 if (pid == 0) {
00564 dup2(STDERR_FILENO, STDOUT_FILENO);
00565 dup2(fds[0], SERVER_SOCKET_FD);
00566
00567
00568 for (long i = sysconf(_SC_OPEN_MAX) - 1; i > SERVER_SOCKET_FD; i--) {
00569 close(i);
00570 }
00571
00572 execlp(
00573 #if 0
00574 "valgrind",
00575 "valgrind",
00576 #else
00577 m_serverExecutable.c_str(),
00578 #endif
00579 m_serverExecutable.c_str(),
00580 toString(Passenger::getLogLevel()).c_str(),
00581 m_spawnServerCommand.c_str(),
00582 m_logFile.c_str(),
00583 m_rubyCommand.c_str(),
00584 m_user.c_str(),
00585 statusReportFIFO.c_str(),
00586 (char *) 0);
00587 int e = errno;
00588 fprintf(stderr, "*** Passenger ERROR (%s:%d):\n"
00589 "Cannot execute %s: %s (%d)\n",
00590 __FILE__, __LINE__,
00591 m_serverExecutable.c_str(), strerror(e), e);
00592 fflush(stderr);
00593 _exit(1);
00594 } else if (pid == -1) {
00595 syscalls::close(fds[0]);
00596 syscalls::close(fds[1]);
00597 throw SystemException("Cannot create a new process", errno);
00598 } else {
00599 syscalls::close(fds[0]);
00600 serverSocket = fds[1];
00601
00602 int flags = fcntl(serverSocket, F_GETFD);
00603 if (flags != -1) {
00604 fcntl(serverSocket, F_SETFD, flags | FD_CLOEXEC);
00605 }
00606
00607 serverPid = pid;
00608 }
00609 }
00610
00611 void createStatusReportFIFO() {
00612 TRACE_POINT();
00613 char filename[PATH_MAX];
00614 int ret;
00615 mode_t permissions;
00616
00617 createPassengerTempDir();
00618
00619 if (m_user.empty()) {
00620 permissions = S_IRUSR | S_IWUSR;
00621 } else {
00622 permissions = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
00623 }
00624
00625 snprintf(filename, sizeof(filename), "%s/status.fifo",
00626 getPassengerTempDir().c_str());
00627 filename[PATH_MAX - 1] = '\0';
00628 do {
00629 ret = mkfifo(filename, permissions);
00630 } while (ret == -1 && errno == EINTR);
00631 if (ret == -1 && errno != EEXIST) {
00632 int e = errno;
00633 P_WARN("*** WARNING: Could not create FIFO '" << filename <<
00634 "': " << strerror(e) << " (" << e << ")" << endl <<
00635 "Disabling Passenger ApplicationPool status reporting.");
00636 statusReportFIFO = "";
00637 } else {
00638 statusReportFIFO = filename;
00639
00640
00641
00642 do {
00643 ret = chmod(filename, permissions);
00644 } while (ret == -1 && errno == EINTR);
00645 }
00646 }
00647
00648 public:
00649
00650
00651
00652
00653
00654
00655
00656
00657
00658
00659
00660
00661
00662
00663
00664
00665
00666
00667
00668
00669
00670
00671 ApplicationPoolServer(const string &serverExecutable,
00672 const string &spawnServerCommand,
00673 const string &logFile = "",
00674 const string &rubyCommand = "ruby",
00675 const string &user = "")
00676 : m_serverExecutable(serverExecutable),
00677 m_spawnServerCommand(spawnServerCommand),
00678 m_logFile(logFile),
00679 m_rubyCommand(rubyCommand),
00680 m_user(user) {
00681 TRACE_POINT();
00682 serverSocket = -1;
00683 serverPid = 0;
00684 this_thread::disable_syscall_interruption dsi;
00685 restartServer();
00686 }
00687
00688 ~ApplicationPoolServer() {
00689 TRACE_POINT();
00690 if (serverSocket != -1) {
00691 UPDATE_TRACE_POINT();
00692 this_thread::disable_syscall_interruption dsi;
00693 shutdownServer();
00694 }
00695 }
00696
00697
00698
00699
00700
00701
00702
00703
00704
00705
00706
00707
00708
00709
00710
00711
00712
00713
00714
00715
00716
00717
00718
00719
00720
00721
00722
00723
00724
00725
00726
00727
00728
00729 ApplicationPoolPtr connect() {
00730 TRACE_POINT();
00731 try {
00732 this_thread::disable_syscall_interruption dsi;
00733 MessageChannel channel(serverSocket);
00734 int clientConnection;
00735
00736
00737 channel.writeRaw("x", 1);
00738
00739 clientConnection = channel.readFileDescriptor();
00740 return ptr(new Client(clientConnection));
00741 } catch (const SystemException &e) {
00742 throw SystemException("Could not connect to the ApplicationPool server", e.code());
00743 } catch (const IOException &e) {
00744 string message("Could not connect to the ApplicationPool server: ");
00745 message.append(e.what());
00746 throw IOException(message);
00747 }
00748 }
00749
00750
00751
00752
00753
00754
00755
00756
00757
00758
00759
00760
00761
00762
00763
00764 void detach() {
00765 TRACE_POINT();
00766 int ret;
00767 do {
00768 ret = close(serverSocket);
00769 } while (ret == -1 && errno == EINTR);
00770 serverSocket = -1;
00771 }
00772 };
00773
00774 typedef shared_ptr<ApplicationPoolServer> ApplicationPoolServerPtr;
00775
00776 }
00777
00778 #endif