00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef _PASSENGER_APPLICATION_H_
00021 #define _PASSENGER_APPLICATION_H_
00022
00023 #include <boost/shared_ptr.hpp>
00024 #include <boost/function.hpp>
00025 #include <oxt/system_calls.hpp>
00026 #include <oxt/backtrace.hpp>
00027 #include <string>
00028 #include <vector>
00029
00030 #include <sys/types.h>
00031 #include <sys/socket.h>
00032 #include <sys/un.h>
00033 #include <netdb.h>
00034 #include <unistd.h>
00035 #include <errno.h>
00036 #include <ctime>
00037 #include <cstring>
00038
00039 #include "MessageChannel.h"
00040 #include "Exceptions.h"
00041 #include "Logging.h"
00042 #include "Utils.h"
00043
00044 namespace Passenger {
00045
00046 using namespace std;
00047 using namespace boost;
00048
00049
00050
00051
00052
00053
00054 class Application {
00055 public:
00056 class Session;
00057
00058 typedef shared_ptr<Session> SessionPtr;
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081 class Session {
00082 public:
00083
00084
00085
00086 virtual ~Session() {}
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112 virtual void sendHeaders(const char *headers, unsigned int size) {
00113 TRACE_POINT();
00114 int stream = getStream();
00115 if (stream == -1) {
00116 throw IOException("Cannot write headers to the request handler "
00117 "because the writer stream has already been closed.");
00118 }
00119 try {
00120 MessageChannel(stream).writeScalar(headers, size);
00121 } catch (SystemException &e) {
00122 e.setBriefMessage("An error occured while writing headers "
00123 "to the request handler");
00124 throw;
00125 }
00126 }
00127
00128
00129
00130
00131
00132
00133
00134
00135 virtual void sendHeaders(const string &headers) {
00136 sendHeaders(headers.c_str(), headers.size());
00137 }
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153 virtual void sendBodyBlock(const char *block, unsigned int size) {
00154 TRACE_POINT();
00155 int stream = getStream();
00156 if (stream == -1) {
00157 throw IOException("Cannot write request body block to the "
00158 "request handler because the writer stream has "
00159 "already been closed.");
00160 }
00161 try {
00162 MessageChannel(stream).writeRaw(block, size);
00163 } catch (SystemException &e) {
00164 e.setBriefMessage("An error occured while sending the "
00165 "request body to the request handler");
00166 throw;
00167 }
00168 }
00169
00170
00171
00172
00173
00174
00175
00176
00177 virtual int getStream() const = 0;
00178
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188 virtual void setReaderTimeout(unsigned int msec) = 0;
00189
00190
00191
00192
00193
00194
00195
00196
00197
00198
00199 virtual void setWriterTimeout(unsigned int msec) = 0;
00200
00201
00202
00203
00204
00205
00206
00207
00208 virtual void shutdownReader() = 0;
00209
00210
00211
00212
00213
00214
00215
00216
00217 virtual void shutdownWriter() = 0;
00218
00219
00220
00221
00222
00223
00224
00225 virtual void closeStream() = 0;
00226
00227
00228
00229
00230
00231 virtual void discardStream() = 0;
00232
00233
00234
00235
00236 virtual pid_t getPid() const = 0;
00237 };
00238
00239 private:
00240
00241
00242
00243 class StandardSession: public Session {
00244 protected:
00245 function<void()> closeCallback;
00246 int fd;
00247 pid_t pid;
00248
00249 public:
00250 StandardSession(pid_t pid,
00251 const function<void()> &closeCallback,
00252 int fd) {
00253 this->pid = pid;
00254 this->closeCallback = closeCallback;
00255 this->fd = fd;
00256 }
00257
00258 virtual ~StandardSession() {
00259 TRACE_POINT();
00260 closeStream();
00261 closeCallback();
00262 }
00263
00264 virtual int getStream() const {
00265 return fd;
00266 }
00267
00268 virtual void setReaderTimeout(unsigned int msec) {
00269 MessageChannel(fd).setReadTimeout(msec);
00270 }
00271
00272 virtual void setWriterTimeout(unsigned int msec) {
00273 MessageChannel(fd).setWriteTimeout(msec);
00274 }
00275
00276 virtual void shutdownReader() {
00277 TRACE_POINT();
00278 if (fd != -1) {
00279 int ret = syscalls::shutdown(fd, SHUT_RD);
00280 if (ret == -1) {
00281 throw SystemException("Cannot shutdown the writer stream",
00282 errno);
00283 }
00284 }
00285 }
00286
00287 virtual void shutdownWriter() {
00288 TRACE_POINT();
00289 if (fd != -1) {
00290 int ret = syscalls::shutdown(fd, SHUT_WR);
00291 if (ret == -1) {
00292 throw SystemException("Cannot shutdown the writer stream",
00293 errno);
00294 }
00295 }
00296 }
00297
00298 virtual void closeStream() {
00299 TRACE_POINT();
00300 if (fd != -1) {
00301 int ret = syscalls::close(fd);
00302 if (ret == -1) {
00303 throw SystemException("Cannot close the session stream",
00304 errno);
00305 }
00306 fd = -1;
00307 }
00308 }
00309
00310 virtual void discardStream() {
00311 fd = -1;
00312 }
00313
00314 virtual pid_t getPid() const {
00315 return pid;
00316 }
00317 };
00318
00319 string appRoot;
00320 pid_t pid;
00321 string listenSocketName;
00322 string listenSocketType;
00323 int ownerPipe;
00324
00325 SessionPtr connectToUnixServer(const function<void()> &closeCallback) const {
00326 TRACE_POINT();
00327 int fd, ret;
00328
00329 do {
00330 fd = socket(PF_UNIX, SOCK_STREAM, 0);
00331 } while (fd == -1 && errno == EINTR);
00332 if (fd == -1) {
00333 throw SystemException("Cannot create a new unconnected Unix socket", errno);
00334 }
00335
00336 struct sockaddr_un addr;
00337 addr.sun_family = AF_UNIX;
00338 strncpy(addr.sun_path, listenSocketName.c_str(), sizeof(addr.sun_path));
00339 addr.sun_path[sizeof(addr.sun_path) - 1] = '\0';
00340 do {
00341 ret = ::connect(fd, (const sockaddr *) &addr, sizeof(addr));
00342 } while (ret == -1 && errno == EINTR);
00343 if (ret == -1) {
00344 int e = errno;
00345 string message("Cannot connect to Unix socket '");
00346 message.append(listenSocketName);
00347 message.append("'");
00348 do {
00349 ret = close(fd);
00350 } while (ret == -1 && errno == EINTR);
00351 throw SystemException(message, e);
00352 }
00353
00354 return ptr(new StandardSession(pid, closeCallback, fd));
00355 }
00356
00357 SessionPtr connectToTcpServer(const function<void()> &closeCallback) const {
00358 TRACE_POINT();
00359 int fd, ret;
00360 vector<string> args;
00361
00362 split(listenSocketName, ':', args);
00363 if (args.size() != 2 || atoi(args[1]) == 0) {
00364 throw IOException("Invalid TCP/IP address '" + listenSocketName + "'");
00365 }
00366
00367 struct addrinfo hints, *res;
00368
00369 memset(&hints, 0, sizeof(hints));
00370 hints.ai_family = PF_INET;
00371 hints.ai_socktype = SOCK_STREAM;
00372 ret = getaddrinfo(args[0].c_str(), args[1].c_str(), &hints, &res);
00373 if (ret != 0) {
00374 int e = errno;
00375 throw IOException("Cannot resolve address '" + listenSocketName +
00376 "': " + gai_strerror(e));
00377 }
00378
00379 do {
00380 fd = socket(PF_INET, SOCK_STREAM, 0);
00381 } while (fd == -1 && errno == EINTR);
00382 if (fd == -1) {
00383 freeaddrinfo(res);
00384 throw SystemException("Cannot create a new unconnected TCP socket", errno);
00385 }
00386
00387 do {
00388 ret = ::connect(fd, res->ai_addr, res->ai_addrlen);
00389 } while (ret == -1 && errno == EINTR);
00390 freeaddrinfo(res);
00391 if (ret == -1) {
00392 int e = errno;
00393 string message("Cannot connect to TCP server '");
00394 message.append(listenSocketName);
00395 message.append("'");
00396 do {
00397 ret = close(fd);
00398 } while (ret == -1 && errno == EINTR);
00399 throw SystemException(message, e);
00400 }
00401
00402 return ptr(new StandardSession(pid, closeCallback, fd));
00403 }
00404
00405 public:
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419 Application(const string &theAppRoot, pid_t pid, const string &listenSocketName,
00420 const string &listenSocketType, int ownerPipe) {
00421 appRoot = theAppRoot;
00422 this->pid = pid;
00423 this->listenSocketName = listenSocketName;
00424 this->listenSocketType = listenSocketType;
00425 this->ownerPipe = ownerPipe;
00426 P_TRACE(3, "Application " << this << ": created.");
00427 }
00428
00429 virtual ~Application() {
00430 TRACE_POINT();
00431 int ret;
00432
00433 if (ownerPipe != -1) {
00434 do {
00435 ret = close(ownerPipe);
00436 } while (ret == -1 && errno == EINTR);
00437 }
00438 if (listenSocketType == "unix") {
00439 do {
00440 ret = unlink(listenSocketName.c_str());
00441 } while (ret == -1 && errno == EINTR);
00442 }
00443 P_TRACE(3, "Application " << this << ": destroyed.");
00444 }
00445
00446
00447
00448
00449
00450 string getAppRoot() const {
00451 return appRoot;
00452 }
00453
00454
00455
00456
00457 pid_t getPid() const {
00458 return pid;
00459 }
00460
00461
00462
00463
00464
00465
00466
00467
00468
00469
00470
00471
00472
00473
00474
00475
00476
00477
00478
00479
00480
00481
00482
00483
00484
00485
00486
00487
00488
00489
00490
00491
00492
00493
00494
00495
00496
00497
00498
00499
00500
00501
00502
00503
00504
00505
00506
00507
00508
00509 SessionPtr connect(const function<void()> &closeCallback) const {
00510 TRACE_POINT();
00511 if (listenSocketType == "unix") {
00512 return connectToUnixServer(closeCallback);
00513 } else if (listenSocketType == "tcp") {
00514 return connectToTcpServer(closeCallback);
00515 } else {
00516 throw IOException("Unsupported socket type '" + listenSocketType + "'");
00517 }
00518 }
00519 };
00520
00521
00522 typedef shared_ptr<Application> ApplicationPtr;
00523
00524 }
00525
00526 #endif