ext/common/agents/HelperAgent/Main.cpp in passenger-4.0.60 vs ext/common/agents/HelperAgent/Main.cpp in passenger-5.0.0.beta1
- old
+ new
@@ -21,642 +21,1054 @@
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
+#ifndef _GNU_SOURCE
+ #define _GNU_SOURCE
+#endif
+#ifdef __linux__
+ #define SUPPORTS_PER_THREAD_CPU_AFFINITY
+ #include <sched.h>
+ #include <pthread.h>
+#endif
+
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/un.h>
#include <cstring>
#include <cassert>
#include <cerrno>
#include <stdlib.h>
#include <unistd.h>
#include <limits.h>
+#include <fcntl.h>
#include <pwd.h>
#include <grp.h>
#include <set>
#include <vector>
#include <string>
+#include <algorithm>
#include <iostream>
#include <sstream>
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/make_shared.hpp>
+#include <boost/atomic.hpp>
#include <oxt/thread.hpp>
#include <oxt/system_calls.hpp>
#include <ev++.h>
+#include <agents/HelperAgent/OptionParser.h>
#include <agents/HelperAgent/RequestHandler.h>
-#include <agents/HelperAgent/RequestHandler.cpp>
-#include <agents/HelperAgent/AgentOptions.h>
-#include <agents/HelperAgent/SystemMetricsTool.cpp>
-
+#include <agents/HelperAgent/AdminServer.h>
#include <agents/Base.h>
#include <Constants.h>
+#include <ServerKit/Server.h>
+#include <ServerKit/AcceptLoadBalancer.h>
#include <ApplicationPool2/Pool.h>
#include <MessageServer.h>
#include <MessageReadersWriters.h>
#include <FileDescriptor.h>
#include <ResourceLocator.h>
#include <BackgroundEventLoop.cpp>
-#include <ServerInstanceDir.h>
#include <UnionStation/Core.h>
#include <Exceptions.h>
-#include <MultiLibeio.cpp>
#include <Utils.h>
#include <Utils/Timer.h>
#include <Utils/IOUtils.h>
+#include <Utils/json.h>
#include <Utils/MessageIO.h>
#include <Utils/VariantMap.h>
using namespace boost;
using namespace oxt;
using namespace Passenger;
using namespace Passenger::ApplicationPool2;
-class RemoteController: public MessageServer::Handler {
-private:
- struct SpecificContext: public MessageServer::ClientContext {
+/***** Structures, constants, global variables and forward declarations *****/
+
+namespace Passenger {
+namespace ServerAgent {
+ struct ThreadWorkingObjects {
+ BackgroundEventLoop *bgloop;
+ ServerKit::Context *serverKitContext;
+ RequestHandler *requestHandler;
+
+ ThreadWorkingObjects()
+ : bgloop(NULL),
+ serverKitContext(NULL),
+ requestHandler(NULL)
+ { }
};
- typedef MessageServer::CommonClientContext CommonClientContext;
+ struct AdminWorkingObjects {
+ BackgroundEventLoop *bgloop;
+ ServerKit::Context *serverKitContext;
+ AdminServer *adminServer;
- boost::shared_ptr<RequestHandler> requestHandler;
- PoolPtr pool;
+ AdminWorkingObjects()
+ : bgloop(NULL),
+ serverKitContext(NULL),
+ adminServer(NULL)
+ { }
+ };
+ struct WorkingObjects {
+ int serverFds[SERVER_KIT_MAX_SERVER_ENDPOINTS];
+ int adminServerFds[SERVER_KIT_MAX_SERVER_ENDPOINTS];
+ string password;
+ vector<ServerAgent::AdminServer::Authorization> adminAuthorizations;
- /*********************************************
- * Message handler methods
- *********************************************/
+ ResourceLocator resourceLocator;
+ RandomGeneratorPtr randomGenerator;
+ UnionStation::CorePtr unionStationCore;
+ SpawnerConfigPtr spawnerConfig;
+ SpawnerFactoryPtr spawnerFactory;
+ PoolPtr appPool;
- void processDetachProcess(CommonClientContext &commonContext, SpecificContext *specificContext,
- const vector<string> &args)
- {
- TRACE_POINT();
- commonContext.requireRights(Account::DETACH);
- if (pool->detachProcess((pid_t) atoi(args[1]))) {
- writeArrayMessage(commonContext.fd, "true", NULL);
- } else {
- writeArrayMessage(commonContext.fd, "false", NULL);
+ ServerKit::AcceptLoadBalancer<RequestHandler> loadBalancer;
+ vector<ThreadWorkingObjects> threadWorkingObjects;
+ struct ev_signal sigintWatcher;
+ struct ev_signal sigtermWatcher;
+ struct ev_signal sigquitWatcher;
+
+ AdminWorkingObjects adminWorkingObjects;
+
+ EventFd exitEvent;
+ EventFd allClientsDisconnectedEvent;
+ unsigned int terminationCount;
+ boost::atomic<unsigned int> shutdownCounter;
+ oxt::thread *prestarterThread;
+
+ WorkingObjects()
+ : terminationCount(0),
+ shutdownCounter(0)
+ {
+ for (unsigned int i = 0; i < SERVER_KIT_MAX_SERVER_ENDPOINTS; i++) {
+ serverFds[i] = -1;
+ adminServerFds[i] = -1;
+ }
}
- }
+ };
+} // namespace ServerAgent
+} // namespace Passenger
- void processDetachProcessByKey(CommonClientContext &commonContext, SpecificContext *specificContext,
- const vector<string> &args)
- {
- TRACE_POINT();
- commonContext.requireRights(Account::DETACH);
- // TODO: implement this
- writeArrayMessage(commonContext.fd, "false", NULL);
+using namespace Passenger::ServerAgent;
+
+static VariantMap *agentsOptions;
+static WorkingObjects *workingObjects;
+
+
+/***** Server stuff *****/
+
+static void waitForExitEvent();
+static void cleanup();
+static void deletePidFile();
+static void abortLongRunningConnections(const ApplicationPool2::ProcessPtr &process);
+static void requestHandlerShutdownFinished(RequestHandler *server);
+static void adminServerShutdownFinished(ServerAgent::AdminServer *server);
+static void printInfoInThread();
+
+static void
+parseAndAddAdminAuthorization(const string &description) {
+ TRACE_POINT();
+ WorkingObjects *wo = workingObjects;
+ ServerAgent::AdminServer::Authorization auth;
+ vector<string> args;
+
+ split(description, ':', args);
+
+ if (args.size() == 2) {
+ auth.level = ServerAgent::AdminServer::FULL;
+ auth.username = args[0];
+ auth.password = strip(readAll(args[1]));
+ } else if (args.size() == 3) {
+ auth.level = ServerAgent::AdminServer::parseLevel(args[0]);
+ auth.username = args[1];
+ auth.password = strip(readAll(args[2]));
+ } else {
+ P_BUG("Too many elements in authorization description");
}
- bool processInspect(CommonClientContext &commonContext, SpecificContext *specificContext,
- const vector<string> &args)
- {
- TRACE_POINT();
- commonContext.requireRights(Account::INSPECT_BASIC_INFO);
- if ((args.size() - 1) % 2 != 0) {
- return false;
- }
+ wo->adminAuthorizations.push_back(auth);
+}
- VariantMap options = argsToOptions(args);
- writeScalarMessage(commonContext.fd, pool->inspect(Pool::InspectOptions(options)));
- return true;
+static void
+initializePrivilegedWorkingObjects() {
+ TRACE_POINT();
+ const VariantMap &options = *agentsOptions;
+ WorkingObjects *wo = workingObjects = new WorkingObjects();
+
+ wo->prestarterThread = NULL;
+
+ wo->password = options.get("server_password", false);
+ if (wo->password == "-") {
+ wo->password.clear();
+ } else if (wo->password.empty() && options.has("server_password_file")) {
+ wo->password = strip(readAll(options.get("server_password_file")));
}
- void processToXml(CommonClientContext &commonContext, SpecificContext *specificContext,
- const vector<string> &args)
- {
- TRACE_POINT();
- commonContext.requireRights(Account::INSPECT_BASIC_INFO);
- bool includeSensitiveInfo =
- commonContext.account->hasRights(Account::INSPECT_SENSITIVE_INFO) &&
- args[1] == "true";
- writeScalarMessage(commonContext.fd, pool->toXml(includeSensitiveInfo));
+ vector<string> authorizations = options.getStrSet("server_authorizations",
+ false);
+ string description;
+
+ UPDATE_TRACE_POINT();
+ foreach (description, authorizations) {
+ parseAndAddAdminAuthorization(description);
}
+}
- void processBacktraces(CommonClientContext &commonContext, SpecificContext *specificContext,
- const vector<string> &args)
- {
- TRACE_POINT();
- commonContext.requireRights(Account::INSPECT_BACKTRACES);
- writeScalarMessage(commonContext.fd, oxt::thread::all_backtraces());
+static void
+initializeSingleAppMode() {
+ TRACE_POINT();
+ VariantMap &options = *agentsOptions;
+
+ if (options.getBool("multi_app")) {
+ P_NOTICE(AGENT_EXE " server running in multi-application mode.");
+ return;
}
- void processRestartAppGroup(CommonClientContext &commonContext, SpecificContext *specificContext,
- const vector<string> &args)
- {
- TRACE_POINT();
- commonContext.requireRights(Account::RESTART);
- VariantMap options = argsToOptions(args, 2);
- RestartMethod method = RM_DEFAULT;
- if (options.get("method", false) == "blocking") {
- method = RM_BLOCKING;
- } else if (options.get("method", false) == "rolling") {
- method = RM_ROLLING;
+ if (!options.has("app_type")) {
+ P_DEBUG("Autodetecting application type...");
+ AppTypeDetector detector(NULL, 0);
+ PassengerAppType appType = detector.checkAppRoot(options.get("app_root"));
+ if (appType == PAT_NONE || appType == PAT_ERROR) {
+ fprintf(stderr, "ERROR: unable to autodetect what kind of application "
+ "lives in %s. Please specify information about the app using "
+ "--app-type and --startup-file, or specify a correct location to "
+ "the application you want to serve.\n"
+ "Type '" AGENT_EXE " server --help' for more information.\n",
+ options.get("app_root").c_str());
+ exit(1);
}
- bool result = pool->restartGroupByName(args[1], method);
- writeArrayMessage(commonContext.fd, result ? "true" : "false", NULL);
- }
- void processRequests(CommonClientContext &commonContext, SpecificContext *specificContext,
- const vector<string> &args)
- {
- TRACE_POINT();
- stringstream stream;
- commonContext.requireRights(Account::INSPECT_REQUESTS);
- requestHandler->inspect(stream);
- writeScalarMessage(commonContext.fd, stream.str());
+ options.set("app_type", getAppTypeName(appType));
+ options.set("startup_file", getAppTypeStartupFile(appType));
}
-public:
- RemoteController(const boost::shared_ptr<RequestHandler> &requestHandler, const PoolPtr &pool) {
- this->requestHandler = requestHandler;
- this->pool = pool;
- }
+ P_NOTICE(AGENT_EXE " server running in single-application mode.");
+ P_NOTICE("Serving app : " << options.get("app_root"));
+ P_NOTICE("App type : " << options.get("app_type"));
+ P_NOTICE("App startup file: " << options.get("startup_file"));
+}
- virtual MessageServer::ClientContextPtr newClient(CommonClientContext &commonContext) {
- return boost::make_shared<SpecificContext>();
- }
+static void
+makeFileWorldReadableAndWritable(const string &path) {
+ int ret;
- virtual bool processMessage(CommonClientContext &commonContext,
- MessageServer::ClientContextPtr &_specificContext,
- const vector<string> &args)
- {
- SpecificContext *specificContext = (SpecificContext *) _specificContext.get();
- try {
- if (isCommand(args, "detach_process", 1)) {
- processDetachProcess(commonContext, specificContext, args);
- } else if (isCommand(args, "detach_process_by_key", 1)) {
- processDetachProcessByKey(commonContext, specificContext, args);
- } else if (args[0] == "inspect") {
- return processInspect(commonContext, specificContext, args);
- } else if (isCommand(args, "toXml", 1)) {
- processToXml(commonContext, specificContext, args);
- } else if (isCommand(args, "backtraces", 0)) {
- processBacktraces(commonContext, specificContext, args);
- } else if (isCommand(args, "restart_app_group", 1, 99)) {
- processRestartAppGroup(commonContext, specificContext, args);
- } else if (isCommand(args, "requests", 0)) {
- processRequests(commonContext, specificContext, args);
- } else {
- return false;
- }
- } catch (const SecurityException &) {
- /* Client does not have enough rights to perform a certain action.
- * It has already been notified of this; ignore exception and move on.
- */
+ do {
+ ret = chmod(path.c_str(), parseModeString("u=rw,g=rw,o=rw"));
+ } while (ret == -1 && errno == EINTR);
+}
+
+static void
+startListening() {
+ TRACE_POINT();
+ WorkingObjects *wo = workingObjects;
+ vector<string> addresses = agentsOptions->getStrSet("server_addresses");
+ vector<string> adminAddresses = agentsOptions->getStrSet("server_admin_addresses", false);
+
+ for (unsigned int i = 0; i < addresses.size(); i++) {
+ wo->serverFds[i] = createServer(addresses[i]);
+ if (getSocketAddressType(addresses[i]) == SAT_UNIX) {
+ makeFileWorldReadableAndWritable(parseUnixSocketAddress(addresses[i]));
}
- return true;
}
-};
+ for (unsigned int i = 0; i < adminAddresses.size(); i++) {
+ wo->adminServerFds[i] = createServer(adminAddresses[i]);
+ if (getSocketAddressType(adminAddresses[i]) == SAT_UNIX) {
+ makeFileWorldReadableAndWritable(parseUnixSocketAddress(adminAddresses[i]));
+ }
+ }
+}
-class ExitHandler: public MessageServer::Handler {
-private:
- EventFd &exitEvent;
+static void
+createPidFile() {
+ TRACE_POINT();
+ string pidFile = agentsOptions->get("server_pid_file", false);
+ if (!pidFile.empty()) {
+ char pidStr[32];
-public:
- ExitHandler(EventFd &_exitEvent)
- : exitEvent(_exitEvent)
- { }
+ snprintf(pidStr, sizeof(pidStr), "%lld", (long long) getpid());
- virtual bool processMessage(MessageServer::CommonClientContext &commonContext,
- MessageServer::ClientContextPtr &handlerSpecificContext,
- const vector<string> &args)
- {
- if (args[0] == "exit") {
- TRACE_POINT();
- commonContext.requireRights(Account::EXIT);
- UPDATE_TRACE_POINT();
- exitEvent.notify();
- UPDATE_TRACE_POINT();
- writeArrayMessage(commonContext.fd, "exit command received", NULL);
- return true;
- } else {
- return false;
+ int fd = syscalls::open(pidFile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
+ if (fd == -1) {
+ int e = errno;
+ throw FileSystemException("Cannot create PID file " + pidFile, e, pidFile);
}
+
+ UPDATE_TRACE_POINT();
+ writeExact(fd, pidStr, strlen(pidStr));
+ syscalls::close(fd);
}
-};
+}
-/**
- * A representation of the Server responsible for handling Client instances.
- *
- * @see Client
- */
-class Server {
-private:
- static const int MESSAGE_SERVER_THREAD_STACK_SIZE = 128 * 1024;
- static const int EVENT_LOOP_THREAD_STACK_SIZE = 256 * 1024;
+static void
+lowerPrivilege() {
+ TRACE_POINT();
+}
- FileDescriptor feedbackFd;
- const AgentOptions &options;
+static void
+printInfo(EV_P_ struct ev_signal *watcher, int revents) {
+ oxt::thread(printInfoInThread, "Information printer");
+}
- BackgroundEventLoop poolLoop;
- BackgroundEventLoop requestLoop;
+static void
+inspectRequestHandlerStateAsJson(RequestHandler *rh, string *result) {
+ *result = rh->inspectStateAsJson().toStyledString();
+}
- FileDescriptor requestSocket;
- ServerInstanceDir serverInstanceDir;
- ServerInstanceDir::GenerationPtr generation;
- UnionStation::CorePtr unionStationCore;
- RandomGeneratorPtr randomGenerator;
- SpawnerConfigPtr spawnerConfig;
- SpawnerFactoryPtr spawnerFactory;
- PoolPtr pool;
- ev::sig sigquitWatcher;
- AccountsDatabasePtr accountsDatabase;
- MessageServerPtr messageServer;
- ResourceLocator resourceLocator;
- boost::shared_ptr<RequestHandler> requestHandler;
- boost::shared_ptr<oxt::thread> prestarterThread;
- boost::shared_ptr<oxt::thread> messageServerThread;
- boost::shared_ptr<oxt::thread> eventLoopThread;
- EventFd exitEvent;
+static void
+inspectRequestHandlerConfigAsJson(RequestHandler *rh, string *result) {
+ *result = rh->getConfigAsJson().toStyledString();
+}
- /**
- * Starts listening for client connections on this server's request socket.
- *
- * @throws SystemException Something went wrong while trying to create and bind to the Unix socket.
- * @throws RuntimeException Something went wrong.
- */
- void startListening() {
- this_thread::disable_syscall_interruption dsi;
- requestSocket = createUnixServer(getRequestSocketFilename().c_str());
+static void
+getMbufStats(struct MemoryKit::mbuf_pool *input, struct MemoryKit::mbuf_pool *result) {
+ *result = *input;
+}
- int ret, e;
- do {
- ret = chmod(getRequestSocketFilename().c_str(), S_ISVTX |
- S_IRUSR | S_IWUSR | S_IXUSR |
- S_IRGRP | S_IWGRP | S_IXGRP |
- S_IROTH | S_IWOTH | S_IXOTH);
- } while (ret == -1 && errno == EINTR);
+static void
+printInfoInThread() {
+ TRACE_POINT();
+ WorkingObjects *wo = workingObjects;
+ unsigned int i;
- setNonBlocking(requestSocket);
+ cerr << "### Backtraces\n";
+ cerr << "\n" << oxt::thread::all_backtraces();
+ cerr << "\n";
+ cerr.flush();
- if (!options.requestSocketLink.empty()) {
- struct stat buf;
+ for (i = 0; i < wo->threadWorkingObjects.size(); i++) {
+ ThreadWorkingObjects *two = &wo->threadWorkingObjects[i];
+ string json;
- // If this is a symlink then we'll want to check the file the symlink
- // points to, so we use stat() instead of lstat().
- ret = syscalls::stat(options.requestSocketLink.c_str(), &buf);
- if (ret == 0 || (ret == -1 && errno == ENOENT)) {
- if (ret == -1 || buf.st_mode & S_IFSOCK) {
- if (syscalls::unlink(options.requestSocketLink.c_str()) == -1) {
- e = errno;
- throw FileSystemException("Cannot delete existing socket file '" +
- options.requestSocketLink + "'", e, options.requestSocketLink);
- }
- } else {
- throw RuntimeException("File '" + options.requestSocketLink +
- "' already exists and is not a Unix domain socket");
- }
- } else if (ret == -1 && errno != ENOENT) {
- e = errno;
- throw FileSystemException("Cannot stat() file '" + options.requestSocketLink + "'",
- e,
- options.requestSocketLink);
- }
+ cerr << "### Request handler state (thread " << (i + 1) << ")\n";
+ two->bgloop->safe->runSync(boost::bind(inspectRequestHandlerStateAsJson,
+ two->requestHandler, &json));
+ cerr << json;
+ cerr << "\n";
+ cerr.flush();
+ }
- do {
- ret = symlink(getRequestSocketFilename().c_str(),
- options.requestSocketLink.c_str());
- } while (ret == -1 && errno == EINTR);
- if (ret == -1) {
- e = errno;
- throw FileSystemException("Cannot create a symlink '" +
- options.requestSocketLink +
- "' to '" + getRequestSocketFilename() + "'",
- e,
- options.requestSocketLink);
- }
- }
+ for (i = 0; i < wo->threadWorkingObjects.size(); i++) {
+ ThreadWorkingObjects *two = &wo->threadWorkingObjects[i];
+ string json;
+
+ cerr << "### Request handler config (thread " << (i + 1) << ")\n";
+ two->bgloop->safe->runSync(boost::bind(inspectRequestHandlerConfigAsJson,
+ two->requestHandler, &json));
+ cerr << json;
+ cerr << "\n";
+ cerr.flush();
}
- /**
- * Lowers this process's privilege to that of <em>username</em> and <em>groupname</em>.
- */
- void lowerPrivilege(const string &username, const string &groupname) {
- struct passwd *userEntry;
- gid_t gid;
- int e;
+ struct MemoryKit::mbuf_pool stats;
+ cerr << "### mbuf stats\n\n";
+ wo->threadWorkingObjects[0].bgloop->safe->runSync(boost::bind(getMbufStats,
+ &wo->threadWorkingObjects[0].serverKitContext->mbuf_pool,
+ &stats));
+ cerr << "nfree_mbuf_blockq : " << stats.nfree_mbuf_blockq << "\n";
+ cerr << "nactive_mbuf_blockq : " << stats.nactive_mbuf_blockq << "\n";
+ cerr << "mbuf_block_chunk_size: " << stats.mbuf_block_chunk_size << "\n";
+ cerr << "\n";
+ cerr.flush();
- userEntry = getpwnam(username.c_str());
- if (userEntry == NULL) {
- throw NonExistentUserException(string("Unable to lower Passenger "
- "HelperAgent's privilege to that of user '") + username +
- "': user does not exist.");
- }
- gid = lookupGid(groupname);
- if (gid == (gid_t) -1) {
- throw NonExistentGroupException(string("Unable to lower Passenger "
- "HelperAgent's privilege to that of user '") + username +
- "': user does not exist.");
- }
+ cerr << "### Pool state\n";
+ cerr << "\n" << wo->appPool->inspect();
+ cerr << "\n";
+ cerr.flush();
+}
- if (initgroups(username.c_str(), userEntry->pw_gid) != 0) {
- e = errno;
- throw SystemException(string("Unable to lower Passenger HelperAgent's "
- "privilege to that of user '") + username +
- "': cannot set supplementary groups for this user", e);
- }
- if (setgid(gid) != 0) {
- e = errno;
- throw SystemException(string("Unable to lower Passenger HelperAgent's "
- "privilege to that of user '") + username +
- "': cannot set group ID", e);
- }
- if (setuid(userEntry->pw_uid) != 0) {
- e = errno;
- throw SystemException(string("Unable to lower Passenger HelperAgent's "
- "privilege to that of user '") + username +
- "': cannot set user ID", e);
- }
+static void
+dumpDiagnosticsOnCrash(void *userData) {
+ WorkingObjects *wo = workingObjects;
+ unsigned int i;
- setenv("HOME", userEntry->pw_dir, 1);
+ cerr << "### Backtraces\n";
+ cerr << oxt::thread::all_backtraces();
+ cerr.flush();
+
+ for (i = 0; i < wo->threadWorkingObjects.size(); i++) {
+ ThreadWorkingObjects *two = &wo->threadWorkingObjects[i];
+ cerr << "### Request handler state (thread " << (i + 1) << ")\n";
+ cerr << two->requestHandler->inspectStateAsJson();
+ cerr << "\n";
+ cerr.flush();
}
- void onSigquit(ev::sig &signal, int revents) {
- requestHandler->inspect(cerr);
+ for (i = 0; i < wo->threadWorkingObjects.size(); i++) {
+ ThreadWorkingObjects *two = &wo->threadWorkingObjects[i];
+ cerr << "### Request handler config (thread " << (i + 1) << ")\n";
+ cerr << two->requestHandler->getConfigAsJson();
+ cerr << "\n";
cerr.flush();
- cerr << "\n" << pool->inspect();
- cerr.flush();
- cerr << "\n" << oxt::thread::all_backtraces();
- cerr.flush();
}
- void installDiagnosticsDumper() {
- ::installDiagnosticsDumper(dumpDiagnosticsOnCrash, this);
+ cerr << "### Pool state (simple)\n";
+ // Do not lock, the crash may occur within the pool.
+ Pool::InspectOptions options;
+ options.verbose = true;
+ cerr << wo->appPool->inspect(options, false);
+ cerr << "\n";
+ cerr.flush();
+
+ cerr << "### mbuf stats\n\n";
+ cerr << "nfree_mbuf_blockq : " <<
+ wo->threadWorkingObjects[0].serverKitContext->mbuf_pool.nfree_mbuf_blockq << "\n";
+ cerr << "nactive_mbuf_blockq: " <<
+ wo->threadWorkingObjects[0].serverKitContext->mbuf_pool.nactive_mbuf_blockq << "\n";
+ cerr << "mbuf_block_chunk_size: " <<
+ wo->threadWorkingObjects[0].serverKitContext->mbuf_pool.mbuf_block_chunk_size << "\n";
+ cerr << "\n";
+ cerr.flush();
+
+ cerr << "### Pool state (XML)\n";
+ cerr << wo->appPool->toXml(true, false);
+ cerr << "\n\n";
+ cerr.flush();
+}
+
+static void
+onTerminationSignal(EV_P_ struct ev_signal *watcher, int revents) {
+ WorkingObjects *wo = workingObjects;
+
+ // Start output after '^C'
+ printf("\n");
+
+ wo->terminationCount++;
+ if (wo->terminationCount < 3) {
+ P_NOTICE("Signal received. Gracefully shutting down... (send signal " <<
+ (3 - wo->terminationCount) << " more time(s) to force shutdown)");
+ workingObjects->exitEvent.notify();
+ } else {
+ P_NOTICE("Signal received. Forcing shutdown.");
+ _exit(2);
}
+}
- void uninstallDiagnosticsDumper() {
- ::installDiagnosticsDumper(NULL, NULL);
+static void
+initializeNonPrivilegedWorkingObjects() {
+ TRACE_POINT();
+ VariantMap &options = *agentsOptions;
+ WorkingObjects *wo = workingObjects;
+
+ if (options.get("server_software").find(SERVER_TOKEN_NAME) == string::npos
+ && options.get("server_software").find(FLYING_PASSENGER_NAME) == string::npos)
+ {
+ options.set("server_software", options.get("server_software") +
+ (" " SERVER_TOKEN_NAME "/" PASSENGER_VERSION));
}
+ setenv("SERVER_SOFTWARE", options.get("server_software").c_str(), 1);
+ options.set("data_buffer_dir", absolutizePath(options.get("data_buffer_dir")));
- static void dumpDiagnosticsOnCrash(void *userData) {
- Server *self = (Server *) userData;
+ vector<string> addresses = options.getStrSet("server_addresses");
+ vector<string> adminAddresses = options.getStrSet("server_admin_addresses", false);
- cerr << "### Request handler state\n";
- self->requestHandler->inspect(cerr);
- cerr << "\n";
- cerr.flush();
+ wo->resourceLocator = ResourceLocator(options.get("passenger_root"));
- cerr << "### Pool state (simple)\n";
- // Do not lock, the crash may occur within the pool.
- Pool::InspectOptions options;
- options.verbose = true;
- cerr << self->pool->inspect(options, false);
- cerr << "\n";
- cerr.flush();
+ wo->randomGenerator = boost::make_shared<RandomGenerator>();
+ // Check whether /dev/urandom is actually random.
+ // https://code.google.com/p/phusion-passenger/issues/detail?id=516
+ if (wo->randomGenerator->generateByteString(16) == wo->randomGenerator->generateByteString(16)) {
+ throw RuntimeException("Your random number device, /dev/urandom, appears to be broken. "
+ "It doesn't seem to be returning random data. Please fix this.");
+ }
- cerr << "### Pool state (XML)\n";
- cerr << self->pool->toXml(true, false);
- cerr << "\n\n";
- cerr.flush();
+ UPDATE_TRACE_POINT();
+ if (options.has("logging_agent_address")) {
+ wo->unionStationCore = boost::make_shared<UnionStation::Core>(
+ options.get("logging_agent_address"),
+ "logging",
+ options.get("logging_agent_password"));
+ }
- cerr << "### Backtraces\n";
- cerr << oxt::thread::all_backtraces();
- cerr.flush();
+ UPDATE_TRACE_POINT();
+ wo->spawnerConfig = boost::make_shared<SpawnerConfig>();
+ wo->spawnerConfig->resourceLocator = &wo->resourceLocator;
+ wo->spawnerConfig->agentsOptions = agentsOptions;
+ wo->spawnerConfig->randomGenerator = wo->randomGenerator;
+ wo->spawnerConfig->instanceDir = options.get("instance_dir", false);
+ if (!wo->spawnerConfig->instanceDir.empty()) {
+ wo->spawnerConfig->instanceDir = absolutizePath(wo->spawnerConfig->instanceDir);
}
+ wo->spawnerConfig->finalize();
-public:
- Server(FileDescriptor feedbackFd, const AgentOptions &_options)
- : options(_options),
- requestLoop(true),
- serverInstanceDir(_options.serverInstanceDir, false),
- resourceLocator(options.passengerRoot)
- {
- TRACE_POINT();
- this->feedbackFd = feedbackFd;
+ UPDATE_TRACE_POINT();
+ wo->spawnerFactory = boost::make_shared<SpawnerFactory>(wo->spawnerConfig);
+ wo->appPool = boost::make_shared<Pool>(wo->spawnerFactory, agentsOptions);
+ wo->appPool->initialize();
+ wo->appPool->setMax(options.getInt("max_pool_size"));
+ wo->appPool->setMaxIdleTime(options.getInt("pool_idle_time") * 1000000);
+ wo->appPool->enableSelfChecking(options.getBool("selfchecks"));
+ wo->appPool->abortLongRunningConnectionsCallback = abortLongRunningConnections;
+ UPDATE_TRACE_POINT();
+ unsigned int nthreads = options.getInt("server_threads");
+ BackgroundEventLoop *firstLoop = NULL; // Avoid compiler warning
+ wo->threadWorkingObjects.reserve(nthreads);
+ for (unsigned int i = 0; i < nthreads; i++) {
UPDATE_TRACE_POINT();
- generation = serverInstanceDir.getGeneration(options.generationNumber);
- startListening();
- accountsDatabase = boost::make_shared<AccountsDatabase>();
- accountsDatabase->add("_passenger-status", options.adminToolStatusPassword, false,
- Account::INSPECT_BASIC_INFO | Account::INSPECT_SENSITIVE_INFO |
- Account::INSPECT_BACKTRACES | Account::INSPECT_REQUESTS |
- Account::DETACH | Account::RESTART);
- accountsDatabase->add("_web_server", options.exitPassword, false, Account::EXIT);
- messageServer = boost::make_shared<MessageServer>(
- parseUnixSocketAddress(options.adminSocketAddress), accountsDatabase);
+ ThreadWorkingObjects two;
- createFile(generation->getPath() + "/helper_agent.pid",
- toString(getpid()), S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
-
- if (geteuid() == 0 && !options.userSwitching) {
- lowerPrivilege(options.defaultUser, options.defaultGroup);
+ if (i == 0) {
+ two.bgloop = firstLoop = new BackgroundEventLoop(true, true);
+ } else {
+ two.bgloop = new BackgroundEventLoop(true, false);
}
UPDATE_TRACE_POINT();
- randomGenerator = boost::make_shared<RandomGenerator>();
- // Check whether /dev/urandom is actually random.
- // https://code.google.com/p/phusion-passenger/issues/detail?id=516
- if (randomGenerator->generateByteString(16) == randomGenerator->generateByteString(16)) {
- throw RuntimeException("Your random number device, /dev/urandom, appears to be broken. "
- "It doesn't seem to be returning random data. Please fix this.");
- }
+ two.serverKitContext = new ServerKit::Context(two.bgloop->safe);
+ two.serverKitContext->secureModePassword = wo->password;
+ two.serverKitContext->defaultFileBufferedChannelConfig.bufferDir =
+ options.get("data_buffer_dir");
UPDATE_TRACE_POINT();
- unionStationCore = boost::make_shared<UnionStation::Core>(options.loggingAgentAddress,
- "logging", options.loggingAgentPassword);
- spawnerConfig = boost::make_shared<SpawnerConfig>(resourceLocator, unionStationCore,
- randomGenerator, &options);
- spawnerFactory = boost::make_shared<SpawnerFactory>(generation, spawnerConfig);
- pool = boost::make_shared<Pool>(spawnerFactory, &options);
- pool->initialize();
- pool->setMax(options.maxPoolSize);
- pool->setMaxIdleTime(options.poolIdleTime * 1000000);
+ two.requestHandler = new RequestHandler(two.serverKitContext, agentsOptions, i + 1);
+ two.requestHandler->minSpareClients = 128;
+ two.requestHandler->clientFreelistLimit = 1024;
+ two.requestHandler->resourceLocator = &wo->resourceLocator;
+ two.requestHandler->appPool = wo->appPool;
+ two.requestHandler->unionStationCore = wo->unionStationCore;
+ two.requestHandler->shutdownFinishCallback = requestHandlerShutdownFinished;
+ two.requestHandler->initialize();
+ wo->shutdownCounter.fetch_add(1, boost::memory_order_relaxed);
- requestHandler = boost::make_shared<RequestHandler>(requestLoop.safe,
- requestSocket, pool, options);
+ wo->threadWorkingObjects.push_back(two);
+ }
- messageServer->addHandler(boost::make_shared<RemoteController>(requestHandler, pool));
- messageServer->addHandler(ptr(new ExitHandler(exitEvent)));
+ UPDATE_TRACE_POINT();
+ ev_signal_init(&wo->sigquitWatcher, printInfo, SIGQUIT);
+ ev_signal_start(firstLoop->loop, &wo->sigquitWatcher);
+ ev_signal_init(&wo->sigintWatcher, onTerminationSignal, SIGINT);
+ ev_signal_start(firstLoop->loop, &wo->sigintWatcher);
+ ev_signal_init(&wo->sigtermWatcher, onTerminationSignal, SIGTERM);
+ ev_signal_start(firstLoop->loop, &wo->sigtermWatcher);
- sigquitWatcher.set(requestLoop.loop);
- sigquitWatcher.set(SIGQUIT);
- sigquitWatcher.set<Server, &Server::onSigquit>(this);
- sigquitWatcher.start();
+ UPDATE_TRACE_POINT();
+ if (!adminAddresses.empty()) {
+ UPDATE_TRACE_POINT();
+ AdminWorkingObjects *awo = &wo->adminWorkingObjects;
+ awo->bgloop = new BackgroundEventLoop(true, false);
+ awo->serverKitContext = new ServerKit::Context(awo->bgloop->safe);
+ awo->serverKitContext->secureModePassword = wo->password;
+ // Configure a large threshold so that it uses libeio as little as possible.
+ // libeio runs on the RequestHandler's first thread, and if there's a
+ // problem there we don't want it to affect the admin server.
+ awo->serverKitContext->defaultFileBufferedChannelConfig.threshold = 1024 * 1024;
+ awo->serverKitContext->defaultFileBufferedChannelConfig.bufferDir =
+ options.get("data_buffer_dir");
+
UPDATE_TRACE_POINT();
- writeArrayMessage(feedbackFd,
- "initialized",
- getRequestSocketFilename().c_str(),
- messageServer->getSocketFilename().c_str(),
- NULL);
+ awo->adminServer = new ServerAgent::AdminServer(awo->serverKitContext);
+ awo->adminServer->requestHandlers.reserve(wo->threadWorkingObjects.size());
+ for (unsigned int i = 0; i < wo->threadWorkingObjects.size(); i++) {
+ awo->adminServer->requestHandlers.push_back(
+ wo->threadWorkingObjects[i].requestHandler);
+ }
+ awo->adminServer->appPool = wo->appPool;
+ awo->adminServer->exitEvent = &wo->exitEvent;
+ awo->adminServer->shutdownFinishCallback = adminServerShutdownFinished;
+ awo->adminServer->authorizations = wo->adminAuthorizations;
- boost::function<void ()> func = boost::bind(prestartWebApps,
- resourceLocator,
- options.defaultRubyCommand,
- options.prestartUrls
- );
- prestarterThread = ptr(new oxt::thread(
- boost::bind(runAndPrintExceptions, func, true)
- ));
+ wo->shutdownCounter.fetch_add(1, boost::memory_order_relaxed);
}
- ~Server() {
- TRACE_POINT();
- this_thread::disable_syscall_interruption dsi;
- this_thread::disable_interruption di;
-
- P_DEBUG("Shutting down helper agent...");
- prestarterThread->interrupt_and_join();
- if (messageServerThread != NULL) {
- messageServerThread->interrupt_and_join();
+ UPDATE_TRACE_POINT();
+ /* We do not delete Unix domain socket files at shutdown because
+ * that can cause a race condition if the user tries to start another
+ * server with the same addresses at the same time. The new server
+ * would then delete the socket and replace it with its own,
+ * while the old server would delete the file yet again shortly after.
+ * This is especially noticeable on systems that heavily swap.
+ */
+ for (unsigned int i = 0; i < addresses.size(); i++) {
+ if (nthreads == 1) {
+ ThreadWorkingObjects *two = &wo->threadWorkingObjects[0];
+ two->requestHandler->listen(wo->serverFds[i]);
+ } else {
+ wo->loadBalancer.listen(wo->serverFds[i]);
}
+ }
+ for (unsigned int i = 0; i < nthreads; i++) {
+ ThreadWorkingObjects *two = &wo->threadWorkingObjects[i];
+ two->requestHandler->createSpareClients();
+ }
+ if (nthreads > 1) {
+ wo->loadBalancer.servers.reserve(nthreads);
+ for (unsigned int i = 0; i < nthreads; i++) {
+ ThreadWorkingObjects *two = &wo->threadWorkingObjects[i];
+ wo->loadBalancer.servers.push_back(two->requestHandler);
+ }
+ }
+ for (unsigned int i = 0; i < adminAddresses.size(); i++) {
+ wo->adminWorkingObjects.adminServer->listen(wo->adminServerFds[i]);
+ }
+}
- messageServer.reset();
- P_DEBUG("Destroying application pool...");
- pool->destroy();
- uninstallDiagnosticsDumper();
- pool.reset();
- poolLoop.stop();
- requestLoop.stop();
- requestHandler.reset();
+static void
+prestartWebApps() {
+ TRACE_POINT();
+ VariantMap &options = *agentsOptions;
+ WorkingObjects *wo = workingObjects;
- if (!options.requestSocketLink.empty()) {
- char path[PATH_MAX + 1];
- ssize_t ret;
- bool shouldUnlink;
+ boost::function<void ()> func = boost::bind(prestartWebApps,
+ wo->resourceLocator,
+ options.get("default_ruby"),
+ options.getStrSet("prestart_urls", false)
+ );
+ wo->prestarterThread = new oxt::thread(
+ boost::bind(runAndPrintExceptions, func, true)
+ );
+}
- ret = readlink(options.requestSocketLink.c_str(), path, PATH_MAX);
- if (ret != -1) {
- path[ret] = '\0';
- // Only unlink if a new Flying Passenger instance hasn't overwritten the
- // symlink.
- // https://code.google.com/p/phusion-passenger/issues/detail?id=939
- shouldUnlink = getRequestSocketFilename() == path;
- } else {
- shouldUnlink = true;
+static void
+reportInitializationInfo() {
+ TRACE_POINT();
+ if (feedbackFdAvailable()) {
+ P_NOTICE(AGENT_EXE " server online, PID " << getpid());
+ writeArrayMessage(FEEDBACK_FD,
+ "initialized",
+ NULL);
+ } else {
+ vector<string> addresses = agentsOptions->getStrSet("server_addresses");
+ vector<string> adminAddresses = agentsOptions->getStrSet("server_admin_addresses", false);
+ string address;
+
+ P_NOTICE(AGENT_EXE " server online, PID " << getpid() <<
+ ", listening on " << addresses.size() << " socket(s):");
+ foreach (address, addresses) {
+ if (startsWith(address, "tcp://")) {
+ address.erase(0, sizeof("tcp://") - 1);
+ address.insert(0, "http://");
+ address.append("/");
}
+ P_NOTICE(" * " << address);
+ }
- if (shouldUnlink) {
- syscalls::unlink(options.requestSocketLink.c_str());
+ if (!adminAddresses.empty()) {
+ P_NOTICE("Admin server listening on " << adminAddresses.size() << " socket(s):");
+ foreach (address, adminAddresses) {
+ if (startsWith(address, "tcp://")) {
+ address.erase(0, sizeof("tcp://") - 1);
+ address.insert(0, "http://");
+ address.append("/");
+ }
+ P_NOTICE(" * " << address);
}
}
+ }
+}
- P_TRACE(2, "All threads have been shut down.");
+static void
+mainLoop() {
+ TRACE_POINT();
+ WorkingObjects *wo = workingObjects;
+ #ifdef SUPPORTS_PER_THREAD_CPU_AFFINITY
+ unsigned int maxCpus = boost::thread::hardware_concurrency();
+ bool cpuAffine = agentsOptions->getBool("server_cpu_affine")
+ && maxCpus <= CPU_SETSIZE;
+ #endif
+
+ installDiagnosticsDumper(dumpDiagnosticsOnCrash, NULL);
+ for (unsigned int i = 0; i < wo->threadWorkingObjects.size(); i++) {
+ ThreadWorkingObjects *two = &wo->threadWorkingObjects[i];
+ two->bgloop->start("Main event loop: thread " + toString(i + 1), 0);
+ #ifdef SUPPORTS_PER_THREAD_CPU_AFFINITY
+ if (cpuAffine) {
+ cpu_set_t cpus;
+ int result;
+
+ CPU_ZERO(&cpus);
+ CPU_SET(i % maxCpus, &cpus);
+ P_DEBUG("Setting CPU affinity of server thread " << (i + 1)
+ << " to CPU " << (i % maxCpus + 1));
+ result = pthread_setaffinity_np(two->bgloop->getNativeHandle(),
+ maxCpus, &cpus);
+ if (result != 0) {
+ P_WARN("Cannot set CPU affinity on server thread " << (i + 1)
+ << ": " << strerror(result) << " (errno=" << result << ")");
+ }
+ }
+ #endif
}
+ if (wo->adminWorkingObjects.adminServer != NULL) {
+ wo->adminWorkingObjects.bgloop->start("Admin event loop", 0);
+ }
+ if (wo->threadWorkingObjects.size() > 1) {
+ wo->loadBalancer.start();
+ }
+ waitForExitEvent();
+}
- void mainLoop() {
- TRACE_POINT();
- boost::function<void ()> func;
+static void
+abortLongRunningConnectionsOnRequestHandler(RequestHandler *requestHandler,
+ string gupid)
+{
+ requestHandler->disconnectLongRunningConnections(gupid);
+}
- func = boost::bind(&MessageServer::mainLoop, messageServer.get());
- messageServerThread = ptr(new oxt::thread(
- boost::bind(runAndPrintExceptions, func, true),
- "MessageServer thread", MESSAGE_SERVER_THREAD_STACK_SIZE
- ));
+static void
+abortLongRunningConnections(const ApplicationPool2::ProcessPtr &process) {
+ // We are inside the ApplicationPool lock. Be very careful here.
+ WorkingObjects *wo = workingObjects;
+ P_NOTICE("Disconnecting long-running connections for process " <<
+ process->pid << ", application " << process->getGroup()->name);
+ for (unsigned int i = 0; i < wo->threadWorkingObjects.size(); i++) {
+ wo->threadWorkingObjects[i].bgloop->safe->runLater(
+ boost::bind(abortLongRunningConnectionsOnRequestHandler,
+ wo->threadWorkingObjects[i].requestHandler,
+ string(process->gupid, process->gupidSize)));
+ }
+}
- poolLoop.start("Pool event loop", 0);
- requestLoop.start("Request event loop", 0);
+static void
+shutdownRequestHandler(ThreadWorkingObjects *two) {
+ two->requestHandler->shutdown();
+}
+static void
+shutdownAdminServer() {
+ workingObjects->adminWorkingObjects.adminServer->shutdown();
+}
- /* Wait until the watchdog closes the feedback fd (meaning it
- * was killed) or until we receive an exit message.
+static void
+serverShutdownFinished() {
+ unsigned int i = workingObjects->shutdownCounter.fetch_sub(1, boost::memory_order_release);
+ if (i == 1) {
+ boost::atomic_thread_fence(boost::memory_order_acquire);
+ workingObjects->allClientsDisconnectedEvent.notify();
+ }
+}
+
+static void
+requestHandlerShutdownFinished(RequestHandler *server) {
+ serverShutdownFinished();
+}
+
+static void
+adminServerShutdownFinished(ServerAgent::AdminServer *server) {
+ serverShutdownFinished();
+}
+
+/* Wait until the watchdog closes the feedback fd (meaning it
+ * was killed) or until we receive an exit message.
+ */
+static void
+waitForExitEvent() {
+ this_thread::disable_syscall_interruption dsi;
+ WorkingObjects *wo = workingObjects;
+ fd_set fds;
+ int largestFd = -1;
+
+ FD_ZERO(&fds);
+ if (feedbackFdAvailable()) {
+ FD_SET(FEEDBACK_FD, &fds);
+ largestFd = std::max(largestFd, FEEDBACK_FD);
+ }
+ FD_SET(wo->exitEvent.fd(), &fds);
+ largestFd = std::max(largestFd, wo->exitEvent.fd());
+
+ TRACE_POINT();
+ if (syscalls::select(largestFd + 1, &fds, NULL, NULL, NULL) == -1) {
+ int e = errno;
+ installDiagnosticsDumper(NULL, NULL);
+ throw SystemException("select() failed", e);
+ }
+
+ if (FD_ISSET(FEEDBACK_FD, &fds)) {
+ UPDATE_TRACE_POINT();
+ /* If the watchdog has been killed then we'll kill all descendant
+ * processes and exit. There's no point in keeping the server agent
+ * running because we can't detect when the web server exits,
+ * and because this server agent doesn't own the instance
+ * directory. As soon as passenger-status is run, the instance
+ * directory will be cleaned up, making the server inaccessible.
*/
- this_thread::disable_syscall_interruption dsi;
- fd_set fds;
- int largestFd;
+ P_WARN("Watchdog seems to be killed; forcing shutdown of all subprocesses");
+ // We send a SIGTERM first to allow processes to gracefully shut down.
+ syscalls::killpg(getpgrp(), SIGTERM);
+ usleep(500000);
+ syscalls::killpg(getpgrp(), SIGKILL);
+ _exit(2); // In case killpg() fails.
+ } else {
+ UPDATE_TRACE_POINT();
+ /* We received an exit command. */
+ P_NOTICE("Received command to shutdown gracefully. "
+ "Waiting until all clients have disconnected...");
+ wo->appPool->prepareForShutdown();
- FD_ZERO(&fds);
- FD_SET(feedbackFd, &fds);
- FD_SET(exitEvent.fd(), &fds);
- largestFd = (feedbackFd > exitEvent.fd()) ? (int) feedbackFd : exitEvent.fd();
+ for (unsigned i = 0; i < wo->threadWorkingObjects.size(); i++) {
+ ThreadWorkingObjects *two = &wo->threadWorkingObjects[i];
+ two->bgloop->safe->runLater(boost::bind(shutdownRequestHandler, two));
+ }
+ if (wo->threadWorkingObjects.size() > 1) {
+ wo->loadBalancer.shutdown();
+ }
+ if (wo->adminWorkingObjects.adminServer != NULL) {
+ wo->adminWorkingObjects.bgloop->safe->runLater(shutdownAdminServer);
+ }
+
UPDATE_TRACE_POINT();
- installDiagnosticsDumper();
- if (syscalls::select(largestFd + 1, &fds, NULL, NULL, NULL) == -1) {
+ FD_ZERO(&fds);
+ FD_SET(wo->allClientsDisconnectedEvent.fd(), &fds);
+ if (syscalls::select(wo->allClientsDisconnectedEvent.fd() + 1,
+ &fds, NULL, NULL, NULL) == -1)
+ {
int e = errno;
- uninstallDiagnosticsDumper();
+ installDiagnosticsDumper(NULL, NULL);
throw SystemException("select() failed", e);
}
- if (FD_ISSET(feedbackFd, &fds)) {
- /* If the watchdog has been killed then we'll kill all descendant
- * processes and exit. There's no point in keeping this helper
- * server running because we can't detect when the web server exits,
- * and because this helper agent doesn't own the server instance
- * directory. As soon as passenger-status is run, the server
- * instance directory will be cleaned up, making this helper agent
- * inaccessible.
- */
- P_DEBUG("Watchdog seems to be killed; forcing shutdown of all subprocesses");
- // We send a SIGTERM first to allow processes to gracefully shut down.
- syscalls::killpg(getpgrp(), SIGTERM);
- usleep(500000);
- syscalls::killpg(getpgrp(), SIGKILL);
- _exit(2); // In case killpg() fails.
- } else {
- /* We received an exit command. We want to exit 5 seconds after
- * all clients have disconnected have become inactive.
- */
- P_DEBUG("Received command to exit gracefully. "
- "Waiting until 5 seconds after all clients have disconnected...");
- pool->prepareForShutdown();
- requestHandler->resetInactivityTime();
- while (requestHandler->inactivityTime() < 5000) {
- syscalls::usleep(250000);
- }
- P_DEBUG("It's now 5 seconds after all clients have disconnected. "
- "Proceeding with graceful exit.");
- }
+ P_INFO("All clients have now disconnected. Proceeding with graceful shutdown");
}
+}
- string getRequestSocketFilename() const {
- return options.requestSocketFilename;
- }
-};
-
-/**
- * Initializes and starts the helper agent that is responsible for handling communication
- * between Nginx and the backend Rails processes.
- *
- * @see Server
- * @see Client
- */
-int
-main(int argc, char *argv[]) {
+static void
+cleanup() {
TRACE_POINT();
+ WorkingObjects *wo = workingObjects;
- if (argc > 1 && strcmp(argv[1], "system-metrics") == 0) {
- return SystemMetricsTool::main(argc, argv);
+ P_DEBUG("Shutting down " AGENT_EXE " server...");
+ wo->appPool->destroy();
+ installDiagnosticsDumper(NULL, NULL);
+ for (unsigned i = 0; i < wo->threadWorkingObjects.size(); i++) {
+ ThreadWorkingObjects *two = &wo->threadWorkingObjects[i];
+ two->bgloop->stop();
}
-
- AgentOptionsPtr options;
- try {
- options = boost::make_shared<AgentOptions>(
- initializeAgent(argc, argv, "PassengerHelperAgent"));
- } catch (const VariantMap::MissingKeyException &e) {
- fprintf(stderr, "Option required: %s\n", e.getKey().c_str());
- return 1;
+ if (wo->adminWorkingObjects.adminServer != NULL) {
+ wo->adminWorkingObjects.bgloop->stop();
}
- if (options->testBinary) {
- printf("PASS\n");
- exit(0);
+ wo->appPool.reset();
+ for (unsigned i = 0; i < wo->threadWorkingObjects.size(); i++) {
+ ThreadWorkingObjects *two = &wo->threadWorkingObjects[i];
+ delete two->requestHandler;
}
+ if (wo->prestarterThread != NULL) {
+ wo->prestarterThread->interrupt_and_join();
+ delete wo->prestarterThread;
+ }
+ for (unsigned int i = 0; i < SERVER_KIT_MAX_SERVER_ENDPOINTS; i++) {
+ if (wo->serverFds[i] != -1) {
+ close(wo->serverFds[i]);
+ }
+ if (wo->adminServerFds[i] != -1) {
+ close(wo->adminServerFds[i]);
+ }
+ }
+ deletePidFile();
+ P_NOTICE(AGENT_EXE " server shutdown finished");
+}
- P_DEBUG("Starting PassengerHelperAgent...");
- MultiLibeio::init();
+static void
+deletePidFile() {
+ TRACE_POINT();
+ string pidFile = agentsOptions->get("server_pid_file", false);
+ if (!pidFile.empty()) {
+ syscalls::unlink(pidFile.c_str());
+ }
+}
+static int
+runServer() {
+ TRACE_POINT();
+ P_NOTICE("Starting " AGENT_EXE " server...");
+
try {
UPDATE_TRACE_POINT();
- Server server(FileDescriptor(FEEDBACK_FD), *options);
- P_WARN("PassengerHelperAgent online, listening at unix:" <<
- server.getRequestSocketFilename());
+ initializePrivilegedWorkingObjects();
+ initializeSingleAppMode();
+ startListening();
+ createPidFile();
+ lowerPrivilege();
+ initializeNonPrivilegedWorkingObjects();
+ prestartWebApps();
UPDATE_TRACE_POINT();
- server.mainLoop();
+ reportInitializationInfo();
+ mainLoop();
+
+ UPDATE_TRACE_POINT();
+ cleanup();
} catch (const tracable_exception &e) {
- P_ERROR("*** ERROR: " << e.what() << "\n" << e.backtrace());
+ P_CRITICAL("ERROR: " << e.what() << "\n" << e.backtrace());
+ deletePidFile();
return 1;
}
- MultiLibeio::shutdown();
- P_TRACE(2, "Helper agent exiting with code 0.");
return 0;
+}
+
+
+/***** Entry point and command line argument parsing *****/
+
+static void
+parseOptions(int argc, const char *argv[], VariantMap &options) {
+ OptionParser p(serverUsage);
+ int i = 2;
+
+ while (i < argc) {
+ if (parseServerOption(argc, argv, i, options)) {
+ continue;
+ } else if (p.isFlag(argv[i], 'h', "--help")) {
+ serverUsage();
+ exit(0);
+ } else {
+ fprintf(stderr, "ERROR: unrecognized argument %s. Please type "
+ "'%s server --help' for usage.\n", argv[i], argv[0]);
+ exit(1);
+ }
+ }
+}
+
+static void
+preinitialize(VariantMap &options) {
+ // Set log_level here so that initializeAgent() calls setLogLevel()
+ // and setLogFile() with the right value.
+ if (options.has("server_log_level")) {
+ options.setInt("log_level", options.getInt("server_log_level"));
+ }
+ if (options.has("server_log_file")) {
+ options.set("debug_log_file", options.get("server_log_file"));
+ }
+}
+
+static void
+setAgentsOptionsDefaults() {
+ VariantMap &options = *agentsOptions;
+ set<string> defaultAddress;
+ defaultAddress.insert(DEFAULT_HTTP_SERVER_LISTEN_ADDRESS);
+
+ options.setDefaultStrSet("server_addresses", defaultAddress);
+ options.setDefaultBool("multi_app", false);
+ options.setDefault("environment", DEFAULT_APP_ENV);
+ options.setDefault("spawn_method", DEFAULT_SPAWN_METHOD);
+ options.setDefaultBool("load_shell_envvars", false);
+ options.setDefault("concurrency_model", DEFAULT_CONCURRENCY_MODEL);
+ options.setDefaultInt("app_thread_count", DEFAULT_APP_THREAD_COUNT);
+ options.setDefaultInt("max_pool_size", DEFAULT_MAX_POOL_SIZE);
+ options.setDefaultInt("pool_idle_time", DEFAULT_POOL_IDLE_TIME);
+ options.setDefaultInt("min_instances", 1);
+ options.setDefaultInt("stat_throttle_rate", DEFAULT_STAT_THROTTLE_RATE);
+ options.setDefault("server_software", SERVER_TOKEN_NAME "/" PASSENGER_VERSION);
+ options.setDefaultBool("show_version_in_header", true);
+ options.setDefaultBool("turbocaching", true);
+ options.setDefault("data_buffer_dir", getSystemTempDir());
+ options.setDefaultBool("selfchecks", false);
+ options.setDefaultBool("server_graceful_exit", true);
+ options.setDefaultInt("server_threads", boost::thread::hardware_concurrency());
+ options.setDefaultBool("server_cpu_affine", false);
+ options.setDefault("friendly_error_pages", "auto");
+ options.setDefaultBool("rolling_restarts", false);
+
+ string firstAddress = options.getStrSet("server_addresses")[0];
+ if (getSocketAddressType(firstAddress) == SAT_TCP) {
+ string host;
+ unsigned short port;
+
+ parseTcpSocketAddress(firstAddress, host, port);
+ options.setDefault("default_server_name", host);
+ options.setDefaultInt("default_server_port", port);
+ } else {
+ options.setDefault("default_server_name", "localhost");
+ options.setDefaultInt("default_server_port", 80);
+ }
+
+ options.setDefault("default_ruby", DEFAULT_RUBY);
+ if (!options.getBool("multi_app") && !options.has("app_root")) {
+ char *pwd = getcwd(NULL, 0);
+ options.set("app_root", pwd);
+ free(pwd);
+ }
+}
+
+static void
+sanityCheckOptions() {
+ VariantMap &options = *agentsOptions;
+ bool ok = true;
+
+ if (!options.has("passenger_root")) {
+ fprintf(stderr, "ERROR: please set the --passenger-root argument.\n");
+ ok = false;
+ }
+ if (options.getBool("multi_app") && options.has("app_root")) {
+ fprintf(stderr, "ERROR: you may not specify an application directory "
+ "when in multi-app mode.\n");
+ ok = false;
+ }
+ if (!options.getBool("multi_app") && options.has("app_type")) {
+ PassengerAppType appType = getAppType(options.get("app_type"));
+ if (appType == PAT_NONE || appType == PAT_ERROR) {
+ fprintf(stderr, "ERROR: '%s' is not a valid applicaion type. Supported app types are:",
+ options.get("app_type").c_str());
+ const AppTypeDefinition *definition = &appTypeDefinitions[0];
+ while (definition->type != PAT_NONE) {
+ fprintf(stderr, " %s", definition->name);
+ definition++;
+ }
+ fprintf(stderr, "\n");
+ ok = false;
+ }
+
+ if (!options.has("startup_file")) {
+ fprintf(stderr, "ERROR: if you've passed --app-type, then you must also pass --startup-file.\n");
+ ok = false;
+ }
+ }
+ if (options.get("concurrency_model") != "process" && options.get("concurrency_model") != "thread") {
+ fprintf(stderr, "ERROR: '%s' is not a valid concurrency model. Supported concurrency "
+ "models are: process, thread.\n",
+ options.get("concurrency_model").c_str());
+ ok = false;
+ } else if (options.get("concurrency_model") != "process") {
+ #ifndef PASSENGER_IS_ENTERPRISE
+ fprintf(stderr, "ERROR: the '%s' concurrency model is only supported in "
+ PROGRAM_NAME " Enterprise.\nYou are currently using the open source "
+ PROGRAM_NAME ". Buy " PROGRAM_NAME " Enterprise here: https://www.phusionpassenger.com/enterprise\n",
+ options.get("concurrency_model").c_str());
+ ok = false;
+ #endif
+ }
+ if (options.getInt("app_thread_count") < 1) {
+ fprintf(stderr, "ERROR: the value passed to --app-thread-count must be at least 1.\n");
+ ok = false;
+ } else if (options.getInt("app_thread_count") > 1) {
+ #ifndef PASSENGER_IS_ENTERPRISE
+ fprintf(stderr, "ERROR: the --app-thread-count option is only supported in "
+ PROGRAM_NAME " Enterprise.\nYou are currently using the open source "
+ PROGRAM_NAME ". Buy " PROGRAM_NAME " Enterprise here: https://www.phusionpassenger.com/enterprise\n");
+ ok = false;
+ #endif
+ }
+ if (RequestHandler::parseBenchmarkMode(options.get("benchmark_mode", false))
+ == RequestHandler::BM_UNKNOWN)
+ {
+ fprintf(stderr, "ERROR: '%s' is not a valid mode for --benchmark.\n",
+ options.get("benchmark_mode", false).c_str());
+ ok = false;
+ }
+ if (options.getInt("server_threads") < 1) {
+ fprintf(stderr, "ERROR: you may only specify for --threads a number greater than or equal to 1.\n");
+ ok = false;
+ }
+
+ if (!ok) {
+ exit(1);
+ }
+}
+
+int
+serverMain(int argc, char *argv[]) {
+ agentsOptions = new VariantMap();
+ *agentsOptions = initializeAgent(argc, &argv, AGENT_EXE " server", parseOptions,
+ preinitialize, 2);
+ setAgentsOptionsDefaults();
+ sanityCheckOptions();
+ return runServer();
}