00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef _PASSENGER_APPLICATION_POOL_SERVER_H_
00021 #define _PASSENGER_APPLICATION_POOL_SERVER_H_
00022
00023 #include <boost/shared_ptr.hpp>
00024 #include <boost/thread/mutex.hpp>
00025
00026 #include <sys/types.h>
00027 #include <sys/stat.h>
00028 #include <sys/wait.h>
00029 #include <sys/socket.h>
00030 #include <cstdio>
00031 #include <cstdlib>
00032 #include <limits.h>
00033 #include <errno.h>
00034 #include <unistd.h>
00035 #include <signal.h>
00036
00037 #include "MessageChannel.h"
00038 #include "ApplicationPool.h"
00039 #include "Application.h"
00040 #include "Exceptions.h"
00041 #include "Logging.h"
00042 #include "System.h"
00043
00044 namespace Passenger {
00045
00046 using namespace std;
00047 using namespace boost;
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129 class ApplicationPoolServer {
00130 private:
00131
00132
00133
00134
00135
00136
00137
00138 struct SharedData {
00139
00140
00141
00142
00143 int server;
00144
00145 mutex lock;
00146
00147 ~SharedData() {
00148 int ret;
00149 do {
00150 ret = close(server);
00151 } while (ret == -1 && errno == EINTR);
00152 }
00153 };
00154
00155 typedef shared_ptr<SharedData> SharedDataPtr;
00156
00157
00158
00159
00160 class RemoteSession: public Application::Session {
00161 private:
00162 SharedDataPtr data;
00163 int id;
00164 int fd;
00165 pid_t pid;
00166 public:
00167 RemoteSession(SharedDataPtr data, pid_t pid, int id, int fd) {
00168 this->data = data;
00169 this->pid = pid;
00170 this->id = id;
00171 this->fd = fd;
00172 }
00173
00174 virtual ~RemoteSession() {
00175 closeStream();
00176 mutex::scoped_lock(data->lock);
00177 MessageChannel(data->server).write("close", toString(id).c_str(), NULL);
00178 }
00179
00180 virtual int getStream() const {
00181 return fd;
00182 }
00183
00184 virtual void shutdownReader() {
00185 if (fd != -1) {
00186 int ret = InterruptableCalls::shutdown(fd, SHUT_RD);
00187 if (ret == -1) {
00188 throw SystemException("Cannot shutdown the writer stream",
00189 errno);
00190 }
00191 }
00192 }
00193
00194 virtual void shutdownWriter() {
00195 if (fd != -1) {
00196 int ret = InterruptableCalls::shutdown(fd, SHUT_WR);
00197 if (ret == -1) {
00198 throw SystemException("Cannot shutdown the writer stream",
00199 errno);
00200 }
00201 }
00202 }
00203
00204 virtual void closeStream() {
00205 if (fd != -1) {
00206 int ret = InterruptableCalls::close(fd);
00207 if (ret == -1) {
00208 throw SystemException("Cannot close the session stream",
00209 errno);
00210 }
00211 fd = -1;
00212 }
00213 }
00214
00215 virtual void discardStream() {
00216 fd = -1;
00217 }
00218
00219 virtual pid_t getPid() const {
00220 return pid;
00221 }
00222 };
00223
00224
00225
00226
00227
00228
00229 class Client: public ApplicationPool {
00230 private:
00231
00232
00233 SharedDataPtr dataSmartPointer;
00234 SharedData *data;
00235
00236 public:
00237
00238
00239
00240
00241
00242 Client(int sock) {
00243 dataSmartPointer = ptr(new SharedData());
00244 data = dataSmartPointer.get();
00245 data->server = sock;
00246 }
00247
00248 virtual void clear() {
00249 MessageChannel channel(data->server);
00250 mutex::scoped_lock l(data->lock);
00251 channel.write("clear", NULL);
00252 }
00253
00254 virtual void setMaxIdleTime(unsigned int seconds) {
00255 MessageChannel channel(data->server);
00256 mutex::scoped_lock l(data->lock);
00257 channel.write("setMaxIdleTime", toString(seconds).c_str(), NULL);
00258 }
00259
00260 virtual void setMax(unsigned int max) {
00261 MessageChannel channel(data->server);
00262 mutex::scoped_lock l(data->lock);
00263 channel.write("setMax", toString(max).c_str(), NULL);
00264 }
00265
00266 virtual unsigned int getActive() const {
00267 MessageChannel channel(data->server);
00268 mutex::scoped_lock l(data->lock);
00269 vector<string> args;
00270
00271 channel.write("getActive", NULL);
00272 channel.read(args);
00273 return atoi(args[0].c_str());
00274 }
00275
00276 virtual unsigned int getCount() const {
00277 MessageChannel channel(data->server);
00278 mutex::scoped_lock l(data->lock);
00279 vector<string> args;
00280
00281 channel.write("getCount", NULL);
00282 channel.read(args);
00283 return atoi(args[0].c_str());
00284 }
00285
00286 virtual void setMaxPerApp(unsigned int max) {
00287 MessageChannel channel(data->server);
00288 mutex::scoped_lock l(data->lock);
00289 channel.write("setMaxPerApp", toString(max).c_str(), NULL);
00290 }
00291
00292 virtual void setUseGlobalQueue(bool value) {
00293 MessageChannel channel(data->server);
00294 boost::mutex::scoped_lock l(data->lock);
00295 channel.write("setUseGlobalQueue", value ? "true" : "false", NULL);
00296 }
00297
00298 virtual pid_t getSpawnServerPid() const {
00299 this_thread::disable_syscall_interruption dsi;
00300 MessageChannel channel(data->server);
00301 mutex::scoped_lock l(data->lock);
00302 vector<string> args;
00303
00304 channel.write("getSpawnServerPid", NULL);
00305 channel.read(args);
00306 return atoi(args[0].c_str());
00307 }
00308
00309 virtual Application::SessionPtr get(
00310 const string &appRoot,
00311 bool lowerPrivilege = true,
00312 const string &lowestUser = "nobody",
00313 const string &environment = "production",
00314 const string &spawnMethod = "smart",
00315 const string &appType = "rails"
00316 ) {
00317 this_thread::disable_syscall_interruption dsi;
00318 MessageChannel channel(data->server);
00319 mutex::scoped_lock l(data->lock);
00320 vector<string> args;
00321 int stream;
00322 bool result;
00323
00324 try {
00325 channel.write("get", appRoot.c_str(),
00326 (lowerPrivilege) ? "true" : "false",
00327 lowestUser.c_str(),
00328 environment.c_str(),
00329 spawnMethod.c_str(),
00330 appType.c_str(),
00331 NULL);
00332 } catch (const SystemException &) {
00333 throw IOException("The ApplicationPool server exited unexpectedly.");
00334 }
00335 try {
00336 result = channel.read(args);
00337 } catch (const SystemException &e) {
00338 throw SystemException("Could not read a message from "
00339 "the ApplicationPool server", e.code());
00340 }
00341 if (!result) {
00342 throw IOException("The ApplicationPool server unexpectedly "
00343 "closed the connection.");
00344 }
00345 if (args[0] == "ok") {
00346 stream = channel.readFileDescriptor();
00347 return ptr(new RemoteSession(dataSmartPointer,
00348 atoi(args[1]), atoi(args[2]), stream));
00349 } else if (args[0] == "SpawnException") {
00350 if (args[2] == "true") {
00351 string errorPage;
00352
00353 if (!channel.readScalar(errorPage)) {
00354 throw IOException("The ApplicationPool server "
00355 "unexpectedly closed the connection.");
00356 }
00357 throw SpawnException(args[1], errorPage);
00358 } else {
00359 throw SpawnException(args[1]);
00360 }
00361 } else if (args[0] == "BusyException") {
00362 throw BusyException(args[1]);
00363 } else if (args[0] == "IOException") {
00364 throw IOException(args[1]);
00365 } else {
00366 throw IOException("The ApplicationPool server returned "
00367 "an unknown message: " + toString(args));
00368 }
00369 }
00370 };
00371
00372
00373 static const int SERVER_SOCKET_FD = 3;
00374
00375 string m_serverExecutable;
00376 string m_spawnServerCommand;
00377 string m_logFile;
00378 string m_rubyCommand;
00379 string m_user;
00380 string statusReportFIFO;
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390 pid_t serverPid;
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400 int serverSocket;
00401
00402
00403
00404
00405
00406
00407
00408
00409 void shutdownServer() {
00410 this_thread::disable_syscall_interruption dsi;
00411 int ret;
00412 time_t begin;
00413 bool done = false;
00414
00415 InterruptableCalls::close(serverSocket);
00416 if (!statusReportFIFO.empty()) {
00417 do {
00418 ret = unlink(statusReportFIFO.c_str());
00419 } while (ret == -1 && errno == EINTR);
00420 }
00421
00422 P_TRACE(2, "Waiting for existing ApplicationPoolServerExecutable (PID " <<
00423 serverPid << ") to exit...");
00424 begin = InterruptableCalls::time(NULL);
00425 while (!done && InterruptableCalls::time(NULL) < begin + 5) {
00426
00427
00428
00429
00430
00431
00432 InterruptableCalls::kill(serverPid, SIGINT);
00433
00434 ret = InterruptableCalls::waitpid(serverPid, NULL, WNOHANG);
00435 done = ret > 0 || ret == -1;
00436 if (!done) {
00437 InterruptableCalls::usleep(100000);
00438 }
00439 }
00440 if (done) {
00441 P_TRACE(2, "ApplicationPoolServerExecutable exited.");
00442 } else {
00443 P_DEBUG("ApplicationPoolServerExecutable not exited in time. Killing it...");
00444 InterruptableCalls::kill(serverPid, SIGTERM);
00445 InterruptableCalls::waitpid(serverPid, NULL, 0);
00446 }
00447
00448 serverSocket = -1;
00449 serverPid = 0;
00450 }
00451
00452
00453
00454
00455
00456
00457
00458
00459
00460 void restartServer() {
00461 int fds[2];
00462 pid_t pid;
00463
00464 if (serverPid != 0) {
00465 shutdownServer();
00466 }
00467
00468 if (InterruptableCalls::socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) {
00469 throw SystemException("Cannot create a Unix socket pair", errno);
00470 }
00471
00472 createStatusReportFIFO();
00473
00474 pid = InterruptableCalls::fork();
00475 if (pid == 0) {
00476 dup2(fds[0], SERVER_SOCKET_FD);
00477
00478
00479 for (long i = sysconf(_SC_OPEN_MAX) - 1; i > SERVER_SOCKET_FD; i--) {
00480 close(i);
00481 }
00482
00483 execlp(
00484 #if 0
00485 "valgrind",
00486 "valgrind",
00487 #else
00488 m_serverExecutable.c_str(),
00489 #endif
00490 m_serverExecutable.c_str(),
00491 toString(Passenger::getLogLevel()).c_str(),
00492 m_spawnServerCommand.c_str(),
00493 m_logFile.c_str(),
00494 m_rubyCommand.c_str(),
00495 m_user.c_str(),
00496 statusReportFIFO.c_str(),
00497 NULL);
00498 int e = errno;
00499 fprintf(stderr, "*** Passenger ERROR: Cannot execute %s: %s (%d)\n",
00500 m_serverExecutable.c_str(), strerror(e), e);
00501 fflush(stderr);
00502 _exit(1);
00503 } else if (pid == -1) {
00504 InterruptableCalls::close(fds[0]);
00505 InterruptableCalls::close(fds[1]);
00506 throw SystemException("Cannot create a new process", errno);
00507 } else {
00508 InterruptableCalls::close(fds[0]);
00509 serverSocket = fds[1];
00510 serverPid = pid;
00511 }
00512 }
00513
00514 void createStatusReportFIFO() {
00515 char filename[PATH_MAX];
00516 int ret;
00517
00518 snprintf(filename, sizeof(filename), "/tmp/passenger_status.%d.fifo",
00519 getpid());
00520 filename[PATH_MAX - 1] = '\0';
00521 do {
00522 ret = mkfifo(filename, S_IRUSR | S_IWUSR);
00523 } while (ret == -1 && errno == EINTR);
00524 if (ret == -1 && errno != EEXIST) {
00525 int e = errno;
00526 P_WARN("*** WARNING: Could not create FIFO '" << filename <<
00527 "': " << strerror(e) << " (" << e << ")" << endl <<
00528 "Disabling Passenger ApplicationPool status reporting.");
00529 statusReportFIFO = "";
00530 } else {
00531 statusReportFIFO = filename;
00532 }
00533 }
00534
00535 public:
00536
00537
00538
00539
00540
00541
00542
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552
00553
00554
00555
00556
00557
00558 ApplicationPoolServer(const string &serverExecutable,
00559 const string &spawnServerCommand,
00560 const string &logFile = "",
00561 const string &rubyCommand = "ruby",
00562 const string &user = "")
00563 : m_serverExecutable(serverExecutable),
00564 m_spawnServerCommand(spawnServerCommand),
00565 m_logFile(logFile),
00566 m_rubyCommand(rubyCommand),
00567 m_user(user) {
00568 serverSocket = -1;
00569 serverPid = 0;
00570 this_thread::disable_syscall_interruption dsi;
00571 restartServer();
00572 }
00573
00574 ~ApplicationPoolServer() {
00575 if (serverSocket != -1) {
00576 this_thread::disable_syscall_interruption dsi;
00577 shutdownServer();
00578 }
00579 }
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604
00605
00606
00607
00608
00609
00610
00611
00612
00613 ApplicationPoolPtr connect() {
00614 try {
00615 this_thread::disable_syscall_interruption dsi;
00616 MessageChannel channel(serverSocket);
00617 int clientConnection;
00618
00619
00620 channel.writeRaw("x", 1);
00621
00622 clientConnection = channel.readFileDescriptor();
00623 return ptr(new Client(clientConnection));
00624 } catch (const SystemException &e) {
00625 throw SystemException("Could not connect to the ApplicationPool server", e.code());
00626 } catch (const IOException &e) {
00627 string message("Could not connect to the ApplicationPool server: ");
00628 message.append(e.what());
00629 throw IOException(message);
00630 }
00631 }
00632
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642
00643
00644
00645
00646
00647 void detach() {
00648 int ret;
00649 do {
00650 ret = close(serverSocket);
00651 } while (ret == -1 && errno == EINTR);
00652 serverSocket = -1;
00653 }
00654 };
00655
00656 typedef shared_ptr<ApplicationPoolServer> ApplicationPoolServerPtr;
00657
00658 }
00659
00660 #endif