00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef _PASSENGER_APPLICATION_POOL_SERVER_H_
00021 #define _PASSENGER_APPLICATION_POOL_SERVER_H_
00022
00023 #include <boost/shared_ptr.hpp>
00024 #include <boost/thread/mutex.hpp>
00025
00026 #include <sys/types.h>
00027 #include <sys/stat.h>
00028 #include <sys/wait.h>
00029 #include <sys/socket.h>
00030 #include <cstdio>
00031 #include <cstdlib>
00032 #include <limits.h>
00033 #include <errno.h>
00034 #include <unistd.h>
00035 #include <signal.h>
00036 #include <fcntl.h>
00037
00038 #include "MessageChannel.h"
00039 #include "ApplicationPool.h"
00040 #include "Application.h"
00041 #include "Exceptions.h"
00042 #include "Logging.h"
00043 #include "System.h"
00044
00045 namespace Passenger {
00046
00047 using namespace std;
00048 using namespace boost;
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130 class ApplicationPoolServer {
00131 private:
00132
00133
00134
00135
00136
00137
00138
00139 struct SharedData {
00140
00141
00142
00143
00144 int server;
00145
00146 mutex lock;
00147
00148 ~SharedData() {
00149 int ret;
00150 do {
00151 ret = close(server);
00152 } while (ret == -1 && errno == EINTR);
00153 }
00154 };
00155
00156 typedef shared_ptr<SharedData> SharedDataPtr;
00157
00158
00159
00160
00161 class RemoteSession: public Application::Session {
00162 private:
00163 SharedDataPtr data;
00164 int id;
00165 int fd;
00166 pid_t pid;
00167 public:
00168 RemoteSession(SharedDataPtr data, pid_t pid, int id, int fd) {
00169 this->data = data;
00170 this->pid = pid;
00171 this->id = id;
00172 this->fd = fd;
00173 }
00174
00175 virtual ~RemoteSession() {
00176 closeStream();
00177 mutex::scoped_lock(data->lock);
00178 MessageChannel(data->server).write("close", toString(id).c_str(), NULL);
00179 }
00180
00181 virtual int getStream() const {
00182 return fd;
00183 }
00184
00185 virtual void shutdownReader() {
00186 if (fd != -1) {
00187 int ret = InterruptableCalls::shutdown(fd, SHUT_RD);
00188 if (ret == -1) {
00189 throw SystemException("Cannot shutdown the writer stream",
00190 errno);
00191 }
00192 }
00193 }
00194
00195 virtual void shutdownWriter() {
00196 if (fd != -1) {
00197 int ret = InterruptableCalls::shutdown(fd, SHUT_WR);
00198 if (ret == -1) {
00199 throw SystemException("Cannot shutdown the writer stream",
00200 errno);
00201 }
00202 }
00203 }
00204
00205 virtual void closeStream() {
00206 if (fd != -1) {
00207 int ret = InterruptableCalls::close(fd);
00208 if (ret == -1) {
00209 throw SystemException("Cannot close the session stream",
00210 errno);
00211 }
00212 fd = -1;
00213 }
00214 }
00215
00216 virtual void discardStream() {
00217 fd = -1;
00218 }
00219
00220 virtual pid_t getPid() const {
00221 return pid;
00222 }
00223 };
00224
00225
00226
00227
00228
00229
00230 class Client: public ApplicationPool {
00231 private:
00232
00233
00234 SharedDataPtr dataSmartPointer;
00235 SharedData *data;
00236
00237 public:
00238
00239
00240
00241
00242
00243 Client(int sock) {
00244 dataSmartPointer = ptr(new SharedData());
00245 data = dataSmartPointer.get();
00246 data->server = sock;
00247 }
00248
00249 virtual void clear() {
00250 MessageChannel channel(data->server);
00251 mutex::scoped_lock l(data->lock);
00252 channel.write("clear", NULL);
00253 }
00254
00255 virtual void setMaxIdleTime(unsigned int seconds) {
00256 MessageChannel channel(data->server);
00257 mutex::scoped_lock l(data->lock);
00258 channel.write("setMaxIdleTime", toString(seconds).c_str(), NULL);
00259 }
00260
00261 virtual void setMax(unsigned int max) {
00262 MessageChannel channel(data->server);
00263 mutex::scoped_lock l(data->lock);
00264 channel.write("setMax", toString(max).c_str(), NULL);
00265 }
00266
00267 virtual unsigned int getActive() const {
00268 MessageChannel channel(data->server);
00269 mutex::scoped_lock l(data->lock);
00270 vector<string> args;
00271
00272 channel.write("getActive", NULL);
00273 channel.read(args);
00274 return atoi(args[0].c_str());
00275 }
00276
00277 virtual unsigned int getCount() const {
00278 MessageChannel channel(data->server);
00279 mutex::scoped_lock l(data->lock);
00280 vector<string> args;
00281
00282 channel.write("getCount", NULL);
00283 channel.read(args);
00284 return atoi(args[0].c_str());
00285 }
00286
00287 virtual void setMaxPerApp(unsigned int max) {
00288 MessageChannel channel(data->server);
00289 mutex::scoped_lock l(data->lock);
00290 channel.write("setMaxPerApp", toString(max).c_str(), NULL);
00291 }
00292
00293 virtual void setUseGlobalQueue(bool value) {
00294 MessageChannel channel(data->server);
00295 boost::mutex::scoped_lock l(data->lock);
00296 channel.write("setUseGlobalQueue", value ? "true" : "false", NULL);
00297 }
00298
00299 virtual pid_t getSpawnServerPid() const {
00300 this_thread::disable_syscall_interruption dsi;
00301 MessageChannel channel(data->server);
00302 mutex::scoped_lock l(data->lock);
00303 vector<string> args;
00304
00305 channel.write("getSpawnServerPid", NULL);
00306 channel.read(args);
00307 return atoi(args[0].c_str());
00308 }
00309
00310 virtual Application::SessionPtr get(
00311 const string &appRoot,
00312 bool lowerPrivilege = true,
00313 const string &lowestUser = "nobody",
00314 const string &environment = "production",
00315 const string &spawnMethod = "smart",
00316 const string &appType = "rails"
00317 ) {
00318 this_thread::disable_syscall_interruption dsi;
00319 MessageChannel channel(data->server);
00320 mutex::scoped_lock l(data->lock);
00321 vector<string> args;
00322 int stream;
00323 bool result;
00324
00325 try {
00326 channel.write("get", appRoot.c_str(),
00327 (lowerPrivilege) ? "true" : "false",
00328 lowestUser.c_str(),
00329 environment.c_str(),
00330 spawnMethod.c_str(),
00331 appType.c_str(),
00332 NULL);
00333 } catch (const SystemException &) {
00334 throw IOException("The ApplicationPool server exited unexpectedly.");
00335 }
00336 try {
00337 result = channel.read(args);
00338 } catch (const SystemException &e) {
00339 throw SystemException("Could not read a message from "
00340 "the ApplicationPool server", e.code());
00341 }
00342 if (!result) {
00343 throw IOException("The ApplicationPool server unexpectedly "
00344 "closed the connection.");
00345 }
00346 if (args[0] == "ok") {
00347 stream = channel.readFileDescriptor();
00348 return ptr(new RemoteSession(dataSmartPointer,
00349 atoi(args[1]), atoi(args[2]), stream));
00350 } else if (args[0] == "SpawnException") {
00351 if (args[2] == "true") {
00352 string errorPage;
00353
00354 if (!channel.readScalar(errorPage)) {
00355 throw IOException("The ApplicationPool server "
00356 "unexpectedly closed the connection.");
00357 }
00358 throw SpawnException(args[1], errorPage);
00359 } else {
00360 throw SpawnException(args[1]);
00361 }
00362 } else if (args[0] == "BusyException") {
00363 throw BusyException(args[1]);
00364 } else if (args[0] == "IOException") {
00365 throw IOException(args[1]);
00366 } else {
00367 throw IOException("The ApplicationPool server returned "
00368 "an unknown message: " + toString(args));
00369 }
00370 }
00371 };
00372
00373
00374 static const int SERVER_SOCKET_FD = 3;
00375
00376 string m_serverExecutable;
00377 string m_spawnServerCommand;
00378 string m_logFile;
00379 string m_rubyCommand;
00380 string m_user;
00381 string statusReportFIFO;
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391 pid_t serverPid;
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401 int serverSocket;
00402
00403
00404
00405
00406
00407
00408
00409
00410 void shutdownServer() {
00411 this_thread::disable_syscall_interruption dsi;
00412 int ret;
00413 time_t begin;
00414 bool done = false;
00415
00416 InterruptableCalls::close(serverSocket);
00417 if (!statusReportFIFO.empty()) {
00418 do {
00419 ret = unlink(statusReportFIFO.c_str());
00420 } while (ret == -1 && errno == EINTR);
00421 }
00422
00423 P_TRACE(2, "Waiting for existing ApplicationPoolServerExecutable (PID " <<
00424 serverPid << ") to exit...");
00425 begin = InterruptableCalls::time(NULL);
00426 while (!done && InterruptableCalls::time(NULL) < begin + 5) {
00427
00428
00429
00430
00431
00432
00433 InterruptableCalls::kill(serverPid, SIGINT);
00434
00435 ret = InterruptableCalls::waitpid(serverPid, NULL, WNOHANG);
00436 done = ret > 0 || ret == -1;
00437 if (!done) {
00438 InterruptableCalls::usleep(100000);
00439 }
00440 }
00441 if (done) {
00442 P_TRACE(2, "ApplicationPoolServerExecutable exited.");
00443 } else {
00444 P_DEBUG("ApplicationPoolServerExecutable not exited in time. Killing it...");
00445 InterruptableCalls::kill(serverPid, SIGTERM);
00446 InterruptableCalls::waitpid(serverPid, NULL, 0);
00447 }
00448
00449 serverSocket = -1;
00450 serverPid = 0;
00451 }
00452
00453
00454
00455
00456
00457
00458
00459
00460
00461 void restartServer() {
00462 int fds[2];
00463 pid_t pid;
00464
00465 if (serverPid != 0) {
00466 shutdownServer();
00467 }
00468
00469 if (InterruptableCalls::socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) {
00470 throw SystemException("Cannot create a Unix socket pair", errno);
00471 }
00472
00473 createStatusReportFIFO();
00474
00475 pid = InterruptableCalls::fork();
00476 if (pid == 0) {
00477 dup2(fds[0], SERVER_SOCKET_FD);
00478
00479
00480 for (long i = sysconf(_SC_OPEN_MAX) - 1; i > SERVER_SOCKET_FD; i--) {
00481 close(i);
00482 }
00483
00484 execlp(
00485 #if 0
00486 "valgrind",
00487 "valgrind",
00488 #else
00489 m_serverExecutable.c_str(),
00490 #endif
00491 m_serverExecutable.c_str(),
00492 toString(Passenger::getLogLevel()).c_str(),
00493 m_spawnServerCommand.c_str(),
00494 m_logFile.c_str(),
00495 m_rubyCommand.c_str(),
00496 m_user.c_str(),
00497 statusReportFIFO.c_str(),
00498 NULL);
00499 int e = errno;
00500 fprintf(stderr, "*** Passenger ERROR: Cannot execute %s: %s (%d)\n",
00501 m_serverExecutable.c_str(), strerror(e), e);
00502 fflush(stderr);
00503 _exit(1);
00504 } else if (pid == -1) {
00505 InterruptableCalls::close(fds[0]);
00506 InterruptableCalls::close(fds[1]);
00507 throw SystemException("Cannot create a new process", errno);
00508 } else {
00509 InterruptableCalls::close(fds[0]);
00510 serverSocket = fds[1];
00511
00512 int flags = fcntl(serverSocket, F_GETFD);
00513 if (flags != -1) {
00514 fcntl(serverSocket, F_SETFD, flags | FD_CLOEXEC);
00515 }
00516
00517 serverPid = pid;
00518 }
00519 }
00520
00521 void createStatusReportFIFO() {
00522 char filename[PATH_MAX];
00523 int ret;
00524
00525 snprintf(filename, sizeof(filename), "/tmp/passenger_status.%d.fifo",
00526 getpid());
00527 filename[PATH_MAX - 1] = '\0';
00528 do {
00529 ret = mkfifo(filename, S_IRUSR | S_IWUSR);
00530 } while (ret == -1 && errno == EINTR);
00531 if (ret == -1 && errno != EEXIST) {
00532 int e = errno;
00533 P_WARN("*** WARNING: Could not create FIFO '" << filename <<
00534 "': " << strerror(e) << " (" << e << ")" << endl <<
00535 "Disabling Passenger ApplicationPool status reporting.");
00536 statusReportFIFO = "";
00537 } else {
00538 statusReportFIFO = filename;
00539 }
00540 }
00541
00542 public:
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552
00553
00554
00555
00556
00557
00558
00559
00560
00561
00562
00563
00564
00565 ApplicationPoolServer(const string &serverExecutable,
00566 const string &spawnServerCommand,
00567 const string &logFile = "",
00568 const string &rubyCommand = "ruby",
00569 const string &user = "")
00570 : m_serverExecutable(serverExecutable),
00571 m_spawnServerCommand(spawnServerCommand),
00572 m_logFile(logFile),
00573 m_rubyCommand(rubyCommand),
00574 m_user(user) {
00575 serverSocket = -1;
00576 serverPid = 0;
00577 this_thread::disable_syscall_interruption dsi;
00578 restartServer();
00579 }
00580
00581 ~ApplicationPoolServer() {
00582 if (serverSocket != -1) {
00583 this_thread::disable_syscall_interruption dsi;
00584 shutdownServer();
00585 }
00586 }
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604
00605
00606
00607
00608
00609
00610
00611
00612
00613
00614
00615
00616
00617
00618
00619
00620 ApplicationPoolPtr connect() {
00621 try {
00622 this_thread::disable_syscall_interruption dsi;
00623 MessageChannel channel(serverSocket);
00624 int clientConnection;
00625
00626
00627 channel.writeRaw("x", 1);
00628
00629 clientConnection = channel.readFileDescriptor();
00630 return ptr(new Client(clientConnection));
00631 } catch (const SystemException &e) {
00632 throw SystemException("Could not connect to the ApplicationPool server", e.code());
00633 } catch (const IOException &e) {
00634 string message("Could not connect to the ApplicationPool server: ");
00635 message.append(e.what());
00636 throw IOException(message);
00637 }
00638 }
00639
00640
00641
00642
00643
00644
00645
00646
00647
00648
00649
00650
00651
00652
00653
00654 void detach() {
00655 int ret;
00656 do {
00657 ret = close(serverSocket);
00658 } while (ret == -1 && errno == EINTR);
00659 serverSocket = -1;
00660 }
00661 };
00662
00663 typedef shared_ptr<ApplicationPoolServer> ApplicationPoolServerPtr;
00664
00665 }
00666
00667 #endif