/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include #include "FacebookBase.h" #include "ServiceTracker.h" #include "concurrency/ThreadManager.h" using namespace std; using namespace facebook::fb303; using namespace apache::thrift::concurrency; uint64_t ServiceTracker::CHECKPOINT_MINIMUM_INTERVAL_SECONDS = 60; int ServiceTracker::LOG_LEVEL = 5; ServiceTracker::ServiceTracker(facebook::fb303::FacebookBase *handler, void (*logMethod)(int, const string &), bool featureCheckpoint, bool featureStatusCheck, bool featureThreadCheck, Stopwatch::Unit stopwatchUnit) : handler_(handler), logMethod_(logMethod), featureCheckpoint_(featureCheckpoint), featureStatusCheck_(featureStatusCheck), featureThreadCheck_(featureThreadCheck), stopwatchUnit_(stopwatchUnit), checkpointServices_(0) { if (featureCheckpoint_) { time_t now = time(NULL); checkpointTime_ = now; } else { checkpointTime_ = 0; } } /** * Registers the beginning of a "service method": basically, any of * the implementations of Thrift remote procedure calls that a * FacebookBase handler is handling. Controls concurrent * services and reports statistics (via log and via fb303 counters). * Throws an exception if the server is not ready to handle service * methods yet. * * note: The relationship between startService() and finishService() * is currently defined so that a call to finishService() should only * be matched to this call to startService() if this method returns * without exception. It wouldn't be a problem to implement things * the other way, so that *every* start needed a finish, but this * convention was chosen to match the way an object's constructor and * destructor work together, i.e. to work well with ServiceMethod * objects. * * @param const ServiceMethod &serviceMethod A reference to the ServiceMethod * object instantiated at the start * of the service method. */ void ServiceTracker::startService(const ServiceMethod &serviceMethod) { // note: serviceMethod.timer_ automatically starts at construction. // log service start logMethod_(5, serviceMethod.signature_); // check handler ready if (featureStatusCheck_ && !serviceMethod.featureLogOnly_) { // note: Throwing exceptions before counting statistics. See note // in method header. // note: A STOPPING server is not accepting new connections, but it // is still handling any already-connected threads -- so from the // service method's point of view, a status of STOPPING is a green // light. facebook::fb303::fb_status status = handler_->getStatus(); if (status != facebook::fb303::ALIVE && status != facebook::fb303::STOPPING) { if (status == facebook::fb303::STARTING) { throw ServiceException("Server starting up; please try again later"); } else { throw ServiceException("Server not alive; please try again later"); } } } // check server threads if (featureThreadCheck_ && !serviceMethod.featureLogOnly_) { // note: Might want to put these messages in reportCheckpoint() if // log is getting spammed. if (threadManager_ != NULL) { size_t idle_count = threadManager_->idleWorkerCount(); if (idle_count == 0) { stringstream message; message << "service " << serviceMethod.signature_ << ": all threads (" << threadManager_->workerCount() << ") in use"; logMethod_(3, message.str()); } } } } /** * Logs a significant step in the middle of a "service method"; see * startService. * * @param const ServiceMethod &serviceMethod A reference to the ServiceMethod * object instantiated at the start * of the service method. * @return int64_t Elapsed units (see stopwatchUnit_) since ServiceMethod * instantiation. */ int64_t ServiceTracker::stepService(const ServiceMethod &serviceMethod, const string &stepName) { stringstream message; string elapsed_label; int64_t elapsed = serviceMethod.timer_.elapsedUnits(stopwatchUnit_, &elapsed_label); message << serviceMethod.signature_ << ' ' << stepName << " [" << elapsed_label << ']'; logMethod_(5, message.str()); return elapsed; } /** * Registers the end of a "service method"; see startService(). * * @param const ServiceMethod &serviceMethod A reference to the ServiceMethod * object instantiated at the start * of the service method. */ void ServiceTracker::finishService(const ServiceMethod &serviceMethod) { // log end of service stringstream message; string duration_label; int64_t duration = serviceMethod.timer_.elapsedUnits(stopwatchUnit_, &duration_label); message << serviceMethod.signature_ << " finish [" << duration_label << ']'; logMethod_(5, message.str()); // count, record, and maybe report service statistics if (!serviceMethod.featureLogOnly_) { if (!featureCheckpoint_) { // lifetime counters // (note: No need to lock statisticsMutex_ if not doing checkpoint; // FacebookService::incrementCounter() is already thread-safe.) handler_->incrementCounter("lifetime_services"); } else { statisticsMutex_.lock(); // note: No exceptions expected from this code block. Wrap in a try // just to be safe. try { // lifetime counters // note: Good to synchronize this with the increment of // checkpoint services, even though incrementCounter() is // already thread-safe, for the sake of checkpoint reporting // consistency (i.e. since the last checkpoint, // lifetime_services has incremented by checkpointServices_). handler_->incrementCounter("lifetime_services"); // checkpoint counters checkpointServices_++; checkpointDuration_ += duration; // per-service timing // note kjv: According to my tests it is very slightly faster to // call insert() once (and detect not-found) than calling find() // and then maybe insert (if not-found). However, the difference // is tiny for small maps like this one, and the code for the // faster solution is slightly less readable. Also, I wonder if // the instantiation of the (often unused) pair to insert makes // the first algorithm slower after all. map >::iterator iter; iter = checkpointServiceDuration_.find(serviceMethod.name_); if (iter != checkpointServiceDuration_.end()) { iter->second.first++; iter->second.second += duration; } else { checkpointServiceDuration_.insert(make_pair(serviceMethod.name_, make_pair(1, duration))); } // maybe report checkpoint // note: ...if it's been long enough since the last report. time_t now = time(NULL); uint64_t check_interval = now - checkpointTime_; if (check_interval >= CHECKPOINT_MINIMUM_INTERVAL_SECONDS) { reportCheckpoint(); } } catch (...) { statisticsMutex_.unlock(); throw; } statisticsMutex_.unlock(); } } } /** * Logs some statistics gathered since the last call to this method. * * note: Thread race conditions on this method could cause * misreporting and/or undefined behavior; the caller must protect * uses of the object variables (and calls to this method) with a * mutex. * */ void ServiceTracker::reportCheckpoint() { time_t now = time(NULL); uint64_t check_count = checkpointServices_; uint64_t check_interval = now - checkpointTime_; uint64_t check_duration = checkpointDuration_; // export counters for timing of service methods (by service name) handler_->setCounter("checkpoint_time", check_interval); map >::iterator iter; uint64_t count; for (iter = checkpointServiceDuration_.begin(); iter != checkpointServiceDuration_.end(); iter++) { count = iter->second.first; handler_->setCounter(string("checkpoint_count_") + iter->first, count); if (count == 0) { handler_->setCounter(string("checkpoint_speed_") + iter->first, 0); } else { handler_->setCounter(string("checkpoint_speed_") + iter->first, iter->second.second / count); } } // reset checkpoint variables // note: Clearing the map while other threads are using it might // cause undefined behavior. checkpointServiceDuration_.clear(); checkpointTime_ = now; checkpointServices_ = 0; checkpointDuration_ = 0; // get lifetime variables uint64_t life_count = handler_->getCounter("lifetime_services"); uint64_t life_interval = now - handler_->aliveSince(); // log checkpoint stringstream message; message << "checkpoint_time:" << check_interval << " checkpoint_services:" << check_count << " checkpoint_speed_sum:" << check_duration << " lifetime_time:" << life_interval << " lifetime_services:" << life_count; if (featureThreadCheck_ && threadManager_ != NULL) { size_t worker_count = threadManager_->workerCount(); size_t idle_count = threadManager_->idleWorkerCount(); message << " total_workers:" << worker_count << " active_workers:" << (worker_count - idle_count); } logMethod_(4, message.str()); } /** * Remembers the thread manager used in the server, for monitoring thread * activity. * * @param shared_ptr threadManager The server's thread manager. */ void ServiceTracker::setThreadManager(boost::shared_ptr threadManager) { threadManager_ = threadManager; } /** * Logs messages to stdout; the passed message will be logged if the * passed level is less than or equal to LOG_LEVEL. * * This is the default logging method used by the ServiceTracker. An * alternate logging method (that accepts the same parameters) may be * specified to the constructor. * * @param int level A level associated with the message: higher levels * are used to indicate higher levels of detail. * @param string message The message to log. */ void ServiceTracker::defaultLogMethod(int level, const string &message) { if (level <= LOG_LEVEL) { string level_string; time_t now = time(NULL); char now_pretty[26]; ctime_r(&now, now_pretty); now_pretty[24] = '\0'; switch (level) { case 1: level_string = "CRITICAL"; break; case 2: level_string = "ERROR"; break; case 3: level_string = "WARNING"; break; case 5: level_string = "DEBUG"; break; case 4: default: level_string = "INFO"; break; } cout << '[' << level_string << "] [" << now_pretty << "] " << message << endl; } } /** * Creates a Stopwatch, which can report the time elapsed since its * creation. * */ Stopwatch::Stopwatch() { gettimeofday(&startTime_, NULL); } void Stopwatch::reset() { gettimeofday(&startTime_, NULL); } uint64_t Stopwatch::elapsedUnits(Stopwatch::Unit unit, string *label) const { timeval now_time; gettimeofday(&now_time, NULL); time_t duration_secs = now_time.tv_sec - startTime_.tv_sec; uint64_t duration_units; switch (unit) { case UNIT_SECONDS: duration_units = duration_secs + (now_time.tv_usec - startTime_.tv_usec + 500000) / 1000000; if (NULL != label) { stringstream ss_label; ss_label << duration_units << " secs"; label->assign(ss_label.str()); } break; case UNIT_MICROSECONDS: duration_units = duration_secs * 1000000 + now_time.tv_usec - startTime_.tv_usec; if (NULL != label) { stringstream ss_label; ss_label << duration_units << " us"; label->assign(ss_label.str()); } break; case UNIT_MILLISECONDS: default: duration_units = duration_secs * 1000 + (now_time.tv_usec - startTime_.tv_usec + 500) / 1000; if (NULL != label) { stringstream ss_label; ss_label << duration_units << " ms"; label->assign(ss_label.str()); } break; } return duration_units; } /** * Creates a ServiceMethod, used for tracking a single service method * invocation (via the ServiceTracker). The passed name of the * ServiceMethod is used to group statistics (e.g. counts and durations) * for similar invocations; the passed signature is used to uniquely * identify the particular invocation in the log. * * note: A version of this constructor is provided that automatically * forms a signature the name and a passed numeric id. Silly, sure, * but commonly used, since it often saves the caller a line or two of * code. * * @param ServiceTracker *tracker The service tracker that will track this * ServiceMethod. * @param const string &name The service method name (usually independent * of service method parameters). * @param const string &signature A signature uniquely identifying the method * invocation (usually name plus parameters). */ ServiceMethod::ServiceMethod(ServiceTracker *tracker, const string &name, const string &signature, bool featureLogOnly) : tracker_(tracker), name_(name), signature_(signature), featureLogOnly_(featureLogOnly) { // note: timer_ automatically starts at construction. // invoke tracker to start service // note: Might throw. If it throws, then this object's destructor // won't be called, which is according to plan: finishService() is // only supposed to be matched to startService() if startService() // returns without error. tracker_->startService(*this); } ServiceMethod::ServiceMethod(ServiceTracker *tracker, const string &name, uint64_t id, bool featureLogOnly) : tracker_(tracker), name_(name), featureLogOnly_(featureLogOnly) { // note: timer_ automatically starts at construction. stringstream ss_signature; ss_signature << name << " (" << id << ')'; signature_ = ss_signature.str(); // invoke tracker to start service // note: Might throw. If it throws, then this object's destructor // won't be called, which is according to plan: finishService() is // only supposed to be matched to startService() if startService() // returns without error. tracker_->startService(*this); } ServiceMethod::~ServiceMethod() { // invoke tracker to finish service // note: Not expecting an exception from this code, but // finishService() might conceivably throw an out-of-memory // exception. try { tracker_->finishService(*this); } catch (...) { // don't throw } } uint64_t ServiceMethod::step(const std::string &stepName) { return tracker_->stepService(*this, stepName); }