00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef _PASSENGER_STANDARD_APPLICATION_POOL_H_
00021 #define _PASSENGER_STANDARD_APPLICATION_POOL_H_
00022
00023 #include <boost/shared_ptr.hpp>
00024 #include <boost/weak_ptr.hpp>
00025 #include <boost/thread.hpp>
00026 #include <boost/bind.hpp>
00027 #include <boost/date_time/microsec_time_clock.hpp>
00028 #include <boost/date_time/posix_time/posix_time.hpp>
00029
00030 #include <oxt/system_calls.hpp>
00031 #include <oxt/backtrace.hpp>
00032
00033 #include <string>
00034 #include <sstream>
00035 #include <map>
00036 #include <list>
00037
00038 #include <sys/types.h>
00039 #include <sys/stat.h>
00040 #include <stdio.h>
00041 #include <unistd.h>
00042 #include <ctime>
00043 #include <cerrno>
00044 #ifdef TESTING_APPLICATION_POOL
00045 #include <cstdlib>
00046 #endif
00047
00048 #include "ApplicationPool.h"
00049 #include "Logging.h"
00050 #include "FileChecker.h"
00051 #include "CachedFileStat.h"
00052 #ifdef PASSENGER_USE_DUMMY_SPAWN_MANAGER
00053 #include "DummySpawnManager.h"
00054 #else
00055 #include "SpawnManager.h"
00056 #endif
00057
00058 namespace Passenger {
00059
00060 using namespace std;
00061 using namespace boost;
00062 using namespace oxt;
00063
00064 class ApplicationPoolServer;
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097 class StandardApplicationPool: public ApplicationPool {
00098 private:
00099 static const int DEFAULT_MAX_IDLE_TIME = 120;
00100 static const int DEFAULT_MAX_POOL_SIZE = 20;
00101 static const int DEFAULT_MAX_INSTANCES_PER_APP = 0;
00102 static const int CLEANER_THREAD_STACK_SIZE = 1024 * 128;
00103 static const unsigned int MAX_GET_ATTEMPTS = 10;
00104 static const unsigned int GET_TIMEOUT = 5000;
00105
00106 friend class ApplicationPoolServer;
00107 struct Domain;
00108 struct AppContainer;
00109
00110 typedef shared_ptr<Domain> DomainPtr;
00111 typedef shared_ptr<AppContainer> AppContainerPtr;
00112 typedef list<AppContainerPtr> AppContainerList;
00113 typedef map<string, DomainPtr> DomainMap;
00114
00115 struct Domain {
00116 AppContainerList instances;
00117 unsigned int size;
00118 unsigned long maxRequests;
00119 FileChecker restartFileChecker;
00120 CachedFileStat alwaysRestartFileStatter;
00121
00122 Domain(const PoolOptions &options)
00123 : restartFileChecker(determineRestartDir(options) + "/restart.txt"),
00124 alwaysRestartFileStatter(determineRestartDir(options) + "/always_restart.txt")
00125 {
00126 }
00127
00128 private:
00129 static string determineRestartDir(const PoolOptions &options) {
00130 if (options.restartDir.empty()) {
00131 return options.appRoot + "/tmp";
00132 } else if (options.restartDir[0] == '/') {
00133 return options.restartDir;
00134 } else {
00135 return options.appRoot + "/" + options.restartDir;
00136 }
00137 }
00138 };
00139
00140 struct AppContainer {
00141 ApplicationPtr app;
00142 time_t startTime;
00143 time_t lastUsed;
00144 unsigned int sessions;
00145 unsigned int processed;
00146 AppContainerList::iterator iterator;
00147 AppContainerList::iterator ia_iterator;
00148
00149 AppContainer() {
00150 startTime = time(NULL);
00151 processed = 0;
00152 }
00153
00154
00155
00156
00157 string uptime() const {
00158 time_t seconds = time(NULL) - startTime;
00159 stringstream result;
00160
00161 if (seconds >= 60) {
00162 time_t minutes = seconds / 60;
00163 if (minutes >= 60) {
00164 time_t hours = minutes / 60;
00165 minutes = minutes % 60;
00166 result << hours << "h ";
00167 }
00168
00169 seconds = seconds % 60;
00170 result << minutes << "m ";
00171 }
00172 result << seconds << "s";
00173 return result.str();
00174 }
00175 };
00176
00177
00178
00179
00180
00181
00182
00183 struct SharedData {
00184 boost::mutex lock;
00185 condition activeOrMaxChanged;
00186
00187 DomainMap domains;
00188 unsigned int max;
00189 unsigned int count;
00190 unsigned int active;
00191 unsigned int maxPerApp;
00192 AppContainerList inactiveApps;
00193 map<string, unsigned int> appInstanceCount;
00194 };
00195
00196 typedef shared_ptr<SharedData> SharedDataPtr;
00197
00198
00199
00200
00201 struct SessionCloseCallback {
00202 SharedDataPtr data;
00203 weak_ptr<AppContainer> container;
00204
00205 SessionCloseCallback(SharedDataPtr data,
00206 const weak_ptr<AppContainer> &container) {
00207 this->data = data;
00208 this->container = container;
00209 }
00210
00211 void operator()() {
00212 boost::mutex::scoped_lock l(data->lock);
00213 AppContainerPtr container(this->container.lock());
00214
00215 if (container == NULL) {
00216 return;
00217 }
00218
00219 DomainMap::iterator it;
00220 it = data->domains.find(container->app->getAppRoot());
00221 if (it != data->domains.end()) {
00222 Domain *domain = it->second.get();
00223 AppContainerList *instances = &domain->instances;
00224
00225 container->processed++;
00226 if (domain->maxRequests > 0 && container->processed >= domain->maxRequests) {
00227 instances->erase(container->iterator);
00228 domain->size--;
00229 if (instances->empty()) {
00230 data->domains.erase(container->app->getAppRoot());
00231 }
00232 data->count--;
00233 data->active--;
00234 data->activeOrMaxChanged.notify_all();
00235 } else {
00236 container->lastUsed = time(NULL);
00237 container->sessions--;
00238 if (container->sessions == 0) {
00239 instances->erase(container->iterator);
00240 instances->push_front(container);
00241 container->iterator = instances->begin();
00242 data->inactiveApps.push_back(container);
00243 container->ia_iterator = data->inactiveApps.end();
00244 container->ia_iterator--;
00245 data->active--;
00246 data->activeOrMaxChanged.notify_all();
00247 }
00248 }
00249 }
00250 }
00251 };
00252
00253 #ifdef PASSENGER_USE_DUMMY_SPAWN_MANAGER
00254 DummySpawnManager spawnManager;
00255 #else
00256 SpawnManager spawnManager;
00257 #endif
00258 SharedDataPtr data;
00259 boost::thread *cleanerThread;
00260 bool detached;
00261 bool done;
00262 unsigned int maxIdleTime;
00263 unsigned int waitingOnGlobalQueue;
00264 condition cleanerThreadSleeper;
00265
00266
00267 boost::mutex &lock;
00268 condition &activeOrMaxChanged;
00269 DomainMap &domains;
00270 unsigned int &max;
00271 unsigned int &count;
00272 unsigned int &active;
00273 unsigned int &maxPerApp;
00274 AppContainerList &inactiveApps;
00275 map<string, unsigned int> &appInstanceCount;
00276
00277
00278
00279
00280 bool inline verifyState() {
00281 #if PASSENGER_DEBUG
00282
00283 DomainMap::const_iterator it;
00284 unsigned int totalSize = 0;
00285 for (it = domains.begin(); it != domains.end(); it++) {
00286 const string &appRoot = it->first;
00287 Domain *domain = it->second.get();
00288 AppContainerList *instances = &domain->instances;
00289
00290 P_ASSERT(domain->size <= count, false,
00291 "domains['" << appRoot << "'].size (" << domain->size <<
00292 ") <= count (" << count << ")");
00293 totalSize += domain->size;
00294
00295
00296
00297 P_ASSERT(!instances->empty(), false,
00298 "domains['" << appRoot << "'].instances is nonempty.");
00299
00300 AppContainerList::const_iterator prev_lit;
00301 AppContainerList::const_iterator lit;
00302 prev_lit = instances->begin();
00303 lit = prev_lit;
00304 lit++;
00305 for (; lit != instances->end(); lit++) {
00306 if ((*prev_lit)->sessions > 0) {
00307 P_ASSERT((*lit)->sessions > 0, false,
00308 "domains['" << appRoot << "'].instances "
00309 "is sorted from nonactive to active");
00310 }
00311 }
00312 }
00313 P_ASSERT(totalSize == count, false, "(sum of all d.size in domains) == count");
00314
00315 P_ASSERT(active <= count, false,
00316 "active (" << active << ") < count (" << count << ")");
00317 P_ASSERT(inactiveApps.size() == count - active, false,
00318 "inactive_apps.size() == count - active");
00319 #endif
00320 return true;
00321 }
00322
00323 string toStringWithoutLock() const {
00324 stringstream result;
00325
00326 result << "----------- General information -----------" << endl;
00327 result << "max = " << max << endl;
00328 result << "count = " << count << endl;
00329 result << "active = " << active << endl;
00330 result << "inactive = " << inactiveApps.size() << endl;
00331 result << "Waiting on global queue: " << waitingOnGlobalQueue << endl;
00332 result << endl;
00333
00334 result << "----------- Domains -----------" << endl;
00335 DomainMap::const_iterator it;
00336 for (it = domains.begin(); it != domains.end(); it++) {
00337 Domain *domain = it->second.get();
00338 AppContainerList *instances = &domain->instances;
00339 AppContainerList::const_iterator lit;
00340
00341 result << it->first << ": " << endl;
00342 for (lit = instances->begin(); lit != instances->end(); lit++) {
00343 AppContainer *container = lit->get();
00344 char buf[128];
00345
00346 snprintf(buf, sizeof(buf),
00347 "PID: %-5lu Sessions: %-2u Processed: %-5u Uptime: %s",
00348 (unsigned long) container->app->getPid(),
00349 container->sessions,
00350 container->processed,
00351 container->uptime().c_str());
00352 result << " " << buf << endl;
00353 }
00354 result << endl;
00355 }
00356 return result.str();
00357 }
00358
00359
00360
00361
00362
00363
00364
00365 bool needsRestart(const string &appRoot, Domain *domain, const PoolOptions &options) {
00366 return domain->alwaysRestartFileStatter.refresh(options.statThrottleRate) == 0
00367 || domain->restartFileChecker.changed(options.statThrottleRate);
00368 }
00369
00370 void cleanerThreadMainLoop() {
00371 this_thread::disable_syscall_interruption dsi;
00372 unique_lock<boost::mutex> l(lock);
00373 try {
00374 while (!done && !this_thread::interruption_requested()) {
00375 xtime xt;
00376 xtime_get(&xt, TIME_UTC);
00377 xt.sec += maxIdleTime + 1;
00378 if (cleanerThreadSleeper.timed_wait(l, xt)) {
00379
00380 if (done) {
00381
00382 break;
00383 } else {
00384
00385 continue;
00386 }
00387 }
00388
00389 time_t now = syscalls::time(NULL);
00390 AppContainerList::iterator it;
00391 for (it = inactiveApps.begin(); it != inactiveApps.end(); it++) {
00392 AppContainer &container(*it->get());
00393 ApplicationPtr app(container.app);
00394 Domain *domain = domains[app->getAppRoot()].get();
00395 AppContainerList *instances = &domain->instances;
00396
00397 if (maxIdleTime > 0 &&
00398 (now - container.lastUsed > (time_t) maxIdleTime)) {
00399 P_DEBUG("Cleaning idle app " << app->getAppRoot() <<
00400 " (PID " << app->getPid() << ")");
00401 instances->erase(container.iterator);
00402
00403 AppContainerList::iterator prev = it;
00404 prev--;
00405 inactiveApps.erase(it);
00406 it = prev;
00407
00408 domain->size--;
00409
00410 count--;
00411 }
00412 if (instances->empty()) {
00413 domains.erase(app->getAppRoot());
00414 }
00415 }
00416 }
00417 } catch (const exception &e) {
00418 P_ERROR("Uncaught exception: " << e.what());
00419 }
00420 }
00421
00422
00423
00424
00425
00426
00427
00428
00429 pair<AppContainerPtr, Domain *>
00430 spawnOrUseExisting(boost::mutex::scoped_lock &l, const PoolOptions &options) {
00431 beginning_of_function:
00432
00433 TRACE_POINT();
00434 this_thread::disable_interruption di;
00435 this_thread::disable_syscall_interruption dsi;
00436 const string &appRoot(options.appRoot);
00437 AppContainerPtr container;
00438 Domain *domain;
00439 AppContainerList *instances;
00440
00441 try {
00442 DomainMap::iterator it(domains.find(appRoot));
00443
00444 if (it != domains.end() && needsRestart(appRoot, it->second.get(), options)) {
00445 AppContainerList::iterator it2;
00446 instances = &it->second->instances;
00447 for (it2 = instances->begin(); it2 != instances->end(); it2++) {
00448 container = *it2;
00449 if (container->sessions == 0) {
00450 inactiveApps.erase(container->ia_iterator);
00451 } else {
00452 active--;
00453 }
00454 it2--;
00455 instances->erase(container->iterator);
00456 count--;
00457 }
00458 domains.erase(appRoot);
00459 spawnManager.reload(appRoot);
00460 it = domains.end();
00461 activeOrMaxChanged.notify_all();
00462 }
00463
00464 if (it != domains.end()) {
00465 domain = it->second.get();
00466 instances = &domain->instances;
00467
00468 if (instances->front()->sessions == 0) {
00469 container = instances->front();
00470 instances->pop_front();
00471 instances->push_back(container);
00472 container->iterator = instances->end();
00473 container->iterator--;
00474 inactiveApps.erase(container->ia_iterator);
00475 active++;
00476 activeOrMaxChanged.notify_all();
00477 } else if (count >= max || (
00478 maxPerApp != 0 && domain->size >= maxPerApp )
00479 ) {
00480 if (options.useGlobalQueue) {
00481 UPDATE_TRACE_POINT();
00482 waitingOnGlobalQueue++;
00483 activeOrMaxChanged.wait(l);
00484 waitingOnGlobalQueue--;
00485 goto beginning_of_function;
00486 } else {
00487 AppContainerList::iterator it(instances->begin());
00488 AppContainerList::iterator smallest(instances->begin());
00489 it++;
00490 for (; it != instances->end(); it++) {
00491 if ((*it)->sessions < (*smallest)->sessions) {
00492 smallest = it;
00493 }
00494 }
00495 container = *smallest;
00496 instances->erase(smallest);
00497 instances->push_back(container);
00498 container->iterator = instances->end();
00499 container->iterator--;
00500 }
00501 } else {
00502 container = ptr(new AppContainer());
00503 {
00504 this_thread::restore_interruption ri(di);
00505 this_thread::restore_syscall_interruption rsi(dsi);
00506 container->app = spawnManager.spawn(options);
00507 }
00508 container->sessions = 0;
00509 instances->push_back(container);
00510 container->iterator = instances->end();
00511 container->iterator--;
00512 domain->size++;
00513 count++;
00514 active++;
00515 activeOrMaxChanged.notify_all();
00516 }
00517 } else {
00518 if (active >= max) {
00519 UPDATE_TRACE_POINT();
00520 activeOrMaxChanged.wait(l);
00521 goto beginning_of_function;
00522 } else if (count == max) {
00523 container = inactiveApps.front();
00524 inactiveApps.pop_front();
00525 domain = domains[container->app->getAppRoot()].get();
00526 instances = &domain->instances;
00527 instances->erase(container->iterator);
00528 if (instances->empty()) {
00529 domains.erase(container->app->getAppRoot());
00530 } else {
00531 domain->size--;
00532 }
00533 count--;
00534 }
00535
00536 UPDATE_TRACE_POINT();
00537 container = ptr(new AppContainer());
00538 {
00539 this_thread::restore_interruption ri(di);
00540 this_thread::restore_syscall_interruption rsi(dsi);
00541 container->app = spawnManager.spawn(options);
00542 }
00543 container->sessions = 0;
00544 it = domains.find(appRoot);
00545 if (it == domains.end()) {
00546 domain = new Domain(options);
00547 domain->size = 1;
00548 domain->maxRequests = options.maxRequests;
00549 domains[appRoot] = ptr(domain);
00550 } else {
00551 domain = it->second.get();
00552 domain->size++;
00553 }
00554 instances = &domain->instances;
00555 instances->push_back(container);
00556 container->iterator = instances->end();
00557 container->iterator--;
00558 count++;
00559 active++;
00560 activeOrMaxChanged.notify_all();
00561 }
00562 } catch (const SpawnException &e) {
00563 string message("Cannot spawn application '");
00564 message.append(appRoot);
00565 message.append("': ");
00566 message.append(e.what());
00567 if (e.hasErrorPage()) {
00568 throw SpawnException(message, e.getErrorPage());
00569 } else {
00570 throw SpawnException(message);
00571 }
00572 } catch (const exception &e) {
00573 string message("Cannot spawn application '");
00574 message.append(appRoot);
00575 message.append("': ");
00576 message.append(e.what());
00577 throw SpawnException(message);
00578 }
00579
00580 return make_pair(container, domain);
00581 }
00582
00583 public:
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603 StandardApplicationPool(const string &spawnServerCommand,
00604 const string &logFile = "",
00605 const string &rubyCommand = "ruby",
00606 const string &user = "")
00607 :
00608 #ifndef PASSENGER_USE_DUMMY_SPAWN_MANAGER
00609 spawnManager(spawnServerCommand, logFile, rubyCommand, user),
00610 #endif
00611 data(new SharedData()),
00612 lock(data->lock),
00613 activeOrMaxChanged(data->activeOrMaxChanged),
00614 domains(data->domains),
00615 max(data->max),
00616 count(data->count),
00617 active(data->active),
00618 maxPerApp(data->maxPerApp),
00619 inactiveApps(data->inactiveApps),
00620 appInstanceCount(data->appInstanceCount)
00621 {
00622 TRACE_POINT();
00623 detached = false;
00624 done = false;
00625 max = DEFAULT_MAX_POOL_SIZE;
00626 count = 0;
00627 active = 0;
00628 waitingOnGlobalQueue = 0;
00629 maxPerApp = DEFAULT_MAX_INSTANCES_PER_APP;
00630 maxIdleTime = DEFAULT_MAX_IDLE_TIME;
00631 cleanerThread = new boost::thread(
00632 bind(&StandardApplicationPool::cleanerThreadMainLoop, this),
00633 CLEANER_THREAD_STACK_SIZE
00634 );
00635 }
00636
00637 virtual ~StandardApplicationPool() {
00638 if (!detached) {
00639 this_thread::disable_interruption di;
00640 {
00641 boost::mutex::scoped_lock l(lock);
00642 done = true;
00643 cleanerThreadSleeper.notify_one();
00644 }
00645 cleanerThread->join();
00646 }
00647 delete cleanerThread;
00648 }
00649
00650 virtual Application::SessionPtr get(const string &appRoot) {
00651 return ApplicationPool::get(appRoot);
00652 }
00653
00654 virtual Application::SessionPtr get(const PoolOptions &options) {
00655 TRACE_POINT();
00656 using namespace boost::posix_time;
00657 unsigned int attempt = 0;
00658
00659
00660
00661 unique_lock<boost::mutex> l(lock);
00662
00663 while (true) {
00664 attempt++;
00665
00666 pair<AppContainerPtr, Domain *> p(
00667 spawnOrUseExisting(l, options)
00668 );
00669 AppContainerPtr &container = p.first;
00670 Domain *domain = p.second;
00671
00672 container->lastUsed = time(NULL);
00673 container->sessions++;
00674
00675 P_ASSERT(verifyState(), Application::SessionPtr(),
00676 "State is valid:\n" << toString(false));
00677 try {
00678 UPDATE_TRACE_POINT();
00679 return container->app->connect(SessionCloseCallback(data, container));
00680 } catch (const exception &e) {
00681 container->sessions--;
00682
00683 AppContainerList &instances(domain->instances);
00684 instances.erase(container->iterator);
00685 domain->size--;
00686 if (instances.empty()) {
00687 domains.erase(options.appRoot);
00688 }
00689 count--;
00690 active--;
00691 activeOrMaxChanged.notify_all();
00692 P_ASSERT(verifyState(), Application::SessionPtr(),
00693 "State is valid: " << toString(false));
00694 if (attempt == MAX_GET_ATTEMPTS) {
00695 string message("Cannot connect to an existing "
00696 "application instance for '");
00697 message.append(options.appRoot);
00698 message.append("': ");
00699 try {
00700 const SystemException &syse =
00701 dynamic_cast<const SystemException &>(e);
00702 message.append(syse.sys());
00703 } catch (const bad_cast &) {
00704 message.append(e.what());
00705 }
00706 throw IOException(message);
00707 }
00708 }
00709 }
00710
00711 return Application::SessionPtr();
00712 }
00713
00714 virtual void clear() {
00715 boost::mutex::scoped_lock l(lock);
00716 domains.clear();
00717 inactiveApps.clear();
00718 appInstanceCount.clear();
00719 count = 0;
00720 active = 0;
00721 activeOrMaxChanged.notify_all();
00722 }
00723
00724 virtual void setMaxIdleTime(unsigned int seconds) {
00725 boost::mutex::scoped_lock l(lock);
00726 maxIdleTime = seconds;
00727 cleanerThreadSleeper.notify_one();
00728 }
00729
00730 virtual void setMax(unsigned int max) {
00731 boost::mutex::scoped_lock l(lock);
00732 this->max = max;
00733 activeOrMaxChanged.notify_all();
00734 }
00735
00736 virtual unsigned int getActive() const {
00737 return active;
00738 }
00739
00740 virtual unsigned int getCount() const {
00741 return count;
00742 }
00743
00744 virtual void setMaxPerApp(unsigned int maxPerApp) {
00745 boost::mutex::scoped_lock l(lock);
00746 this->maxPerApp = maxPerApp;
00747 activeOrMaxChanged.notify_all();
00748 }
00749
00750 virtual pid_t getSpawnServerPid() const {
00751 return spawnManager.getServerPid();
00752 }
00753
00754
00755
00756
00757
00758 virtual string toString(bool lockMutex = true) const {
00759 if (lockMutex) {
00760 unique_lock<boost::mutex> l(lock);
00761 return toStringWithoutLock();
00762 } else {
00763 return toStringWithoutLock();
00764 }
00765 }
00766
00767
00768
00769
00770
00771 virtual string toXml() const {
00772 unique_lock<boost::mutex> l(lock);
00773 stringstream result;
00774 DomainMap::const_iterator it;
00775
00776 result << "<?xml version=\"1.0\" encoding=\"iso8859-1\" ?>\n";
00777 result << "<info>";
00778
00779 result << "<domains>";
00780 for (it = domains.begin(); it != domains.end(); it++) {
00781 Domain *domain = it->second.get();
00782 AppContainerList *instances = &domain->instances;
00783 AppContainerList::const_iterator lit;
00784
00785 result << "<domain>";
00786 result << "<name>" << escapeForXml(it->first) << "</name>";
00787
00788 result << "<instances>";
00789 for (lit = instances->begin(); lit != instances->end(); lit++) {
00790 AppContainer *container = lit->get();
00791
00792 result << "<instance>";
00793 result << "<pid>" << container->app->getPid() << "</pid>";
00794 result << "<sessions>" << container->sessions << "</sessions>";
00795 result << "<processed>" << container->processed << "</processed>";
00796 result << "<uptime>" << container->uptime() << "</uptime>";
00797 result << "</instance>";
00798 }
00799 result << "</instances>";
00800
00801 result << "</domain>";
00802 }
00803 result << "</domains>";
00804
00805 result << "</info>";
00806 return result.str();
00807 }
00808 };
00809
00810 typedef shared_ptr<StandardApplicationPool> StandardApplicationPoolPtr;
00811
00812 }
00813
00814 #endif
00815