ext/common/LoggingAgent/LoggingServer.h in passenger-3.0.6 vs ext/common/LoggingAgent/LoggingServer.h in passenger-3.0.7

- old
+ new

@@ -42,11 +42,10 @@ #include <ctime> #include <cassert> #include "DataStoreId.h" #include "RemoteSender.h" -#include "ChangeNotifier.h" #include "FilterSupport.h" #include "../EventedMessageServer.h" #include "../MessageReadersWriters.h" #include "../StaticString.h" #include "../Exceptions.h" @@ -119,11 +118,11 @@ return false; } virtual void append(const DataStoreId &dataStoreId, const StaticString &data) = 0; - virtual void flush() { } + virtual bool flush() { return true; } virtual void dump(ostream &stream) const { } }; struct LogFile: public LogSink { static const unsigned int BUFFER_CAPACITY = 8 * 1024; @@ -131,16 +130,10 @@ string filename; FileDescriptor fd; char buffer[BUFFER_CAPACITY]; unsigned int bufferSize; - /** - * Contains every (groupName, nodeName, category) tuple for - * which their data is currently buffered in this sink. - */ - set<DataStoreId> dataStoreIds; - LogFile(LoggingServer *server, const string &filename, mode_t filePermissions) : LogSink(server) { int ret; @@ -161,47 +154,33 @@ virtual ~LogFile() { flush(); } - void notifyChanges() { - if (server->changeNotifier != NULL) { - set<DataStoreId>::const_iterator it; - set<DataStoreId>::const_iterator end = dataStoreIds.end(); - - for (it = dataStoreIds.begin(); it != dataStoreIds.end(); it++) { - server->changeNotifier->changed(*it); - } - } - dataStoreIds.clear(); - } - virtual void append(const DataStoreId &dataStoreId, const StaticString &data) { - if (server->changeNotifier != NULL) { - dataStoreIds.insert(dataStoreId); - } if (bufferSize + data.size() > BUFFER_CAPACITY) { StaticString data2[2]; data2[0] = StaticString(buffer, bufferSize); data2[1] = data; gatheredWrite(fd, data2, 2); lastFlushed = ev_now(server->getLoop()); bufferSize = 0; - notifyChanges(); } else { memcpy(buffer + bufferSize, data.data(), data.size()); bufferSize += data.size(); } } - virtual void flush() { + virtual bool flush() { if (bufferSize > 0) { lastFlushed = ev_now(server->getLoop()); writeExact(fd, buffer, bufferSize); bufferSize = 0; - notifyChanges(); + return true; + } else { + return false; } } virtual void dump(ostream &stream) const { stream << " Log file: file=" << filename << ", " @@ -268,17 +247,20 @@ memcpy(buffer + bufferSize, data.data(), data.size()); bufferSize += data.size(); } } - virtual void flush() { + virtual bool flush() { if (bufferSize > 0) { lastFlushed = ev_now(server->getLoop()); StaticString data(buffer, bufferSize); server->remoteSender.schedule(unionStationKey, nodeName, category, &data, 1); bufferSize = 0; + return true; + } else { + return false; } } virtual void dump(ostream &stream) const { stream << " Remote sink: " @@ -374,12 +356,11 @@ typedef shared_ptr<Transaction> TransactionPtr; enum ClientType { UNINITIALIZED, - LOGGER, - WATCHER + LOGGER }; struct Client: public EventedMessageClient { string nodeName; ClientType type; @@ -409,11 +390,10 @@ string dir; gid_t gid; string dirPermissions; mode_t filePermissions; RemoteSender remoteSender; - ChangeNotifierPtr changeNotifier; ev::timer garbageCollectionTimer; ev::timer sinkFlushingTimer; ev::timer exitTimer; TransactionMap transactions; LogSinkCache logSinkCache; @@ -818,62 +798,10 @@ current++; } return true; } - bool getLastEntryInDirectory(const string &path, string &result) const { - DIR *dir = opendir(path.c_str()); - struct dirent *entry; - vector<string> subdirs; - - if (dir == NULL) { - int e = errno; - throw FileSystemException("Cannot open directory " + path, - e, path); - } - while ((entry = readdir(dir)) != NULL) { - if (isDirectory(path, entry) && looksLikeNumber(entry->d_name)) { - subdirs.push_back(entry->d_name); - } - } - closedir(dir); - - if (subdirs.empty()) { - return false; - } - - vector<string>::const_iterator it = subdirs.begin(); - vector<string>::const_iterator end = subdirs.end(); - vector<string>::const_iterator largest_it = subdirs.begin(); - int largest = atoi(subdirs[0]); - for (it++; it != end; it++) { - const string &subdir = *it; - int number = atoi(subdir.c_str()); - if (number > largest) { - largest_it = it; - largest = number; - } - } - result = *largest_it; - return true; - } - - static void pendingDataFlushed(EventedClient *_client) { - Client *client = (Client *) _client; - LoggingServer *self = (LoggingServer *) client->userData; - - client->onPendingDataFlushed = NULL; - if (OXT_UNLIKELY( client->type != WATCHER )) { - P_WARN("BUG: pendingDataFlushed() called even though client type is not WATCHER."); - client->disconnect(); - } else if (self->changeNotifier != NULL) { - self->changeNotifier->addClient(client->detach()); - } else { - client->disconnect(); - } - } - /* Release all inactive log sinks that have been inactive for more than * GARBAGE_COLLECTION_TIMEOUT seconds. */ void releaseInactiveLogSinks(ev_tstamp now) { bool done = false; @@ -1158,30 +1086,10 @@ toHex(StaticString((const char *) digest, MD5_SIZE), client->nodeId); client->type = LOGGER; client->writeArrayMessage("ok", NULL); - } else if (args[0] == "watchChanges") { - if (OXT_UNLIKELY( !checkWhetherConnectionAreAcceptable(client) )) { - return true; - } - if (OXT_UNLIKELY( client->type != UNINITIALIZED )) { - sendErrorToClient(client, "This command cannot be invoked " - "if the 'init' command is already invoked."); - client->disconnect(); - return true; - } - - client->type = WATCHER; - client->notifyReads(false); - discardReadData(); - - // Add to the change notifier after all pending data - // has been written out. - client->onPendingDataFlushed = pendingDataFlushed; - client->writeArrayMessage("ok", NULL); - } else if (args[0] == "flush") { flushAllSinks(); client->writeArrayMessage("ok", NULL); } else if (args[0] == "info") { @@ -1332,82 +1240,21 @@ transaction->discard(); } } // Invoke destructors, causing all transactions and log sinks to - // be flushed before RemoteSender and ChangeNotifier are being - // destroyed. + // be flushed before RemoteSender is being destroyed. transactions.clear(); logSinkCache.clear(); inactiveLogSinks.clear(); } - void setChangeNotifier(const ChangeNotifierPtr &_changeNotifier) { - changeNotifier = _changeNotifier; - changeNotifier->getLastPos = boost::bind(&LoggingServer::getLastPos, - this, _1, _2, _3); - } - - string getLastPos(const StaticString &groupName, const StaticString &nodeName, - const StaticString &category) const - { - md5_state_t state; - md5_byte_t digest[MD5_SIZE]; - char nodeId[MD5_HEX_SIZE]; - md5_init(&state); - md5_append(&state, (const md5_byte_t *) nodeName.data(), nodeName.size()); - md5_finish(&state, digest); - toHex(StaticString((const char *) digest, MD5_SIZE), nodeId); - - string dir = determineFilename(groupName, nodeId, category); - string subdir, component; - subdir.reserve(13); // It's a string that looks like: "2010/06/24/12" - - try { - // Loop 4 times to process year, month, day, hour. - for (int i = 0; i < 4; i++) { - bool found = getLastEntryInDirectory(dir, component); - if (!found) { - return string(); - } - dir.append("/"); - dir.append(component); - if (i != 0) { - subdir.append("/"); - } - subdir.append(component); - } - // After the loop, new dir == old dir + "/" + subdir - } catch (const SystemException &e) { - if (e.code() == ENOENT) { - return string(); - } else { - throw; - } - } - - string &filename = dir; - filename.append("/log.txt"); - - struct stat buf; - if (stat(filename.c_str(), &buf) == -1) { - if (errno == ENOENT) { - return string(); - } else { - int e = errno; - throw FileSystemException("Cannot stat() " + filename, e, - filename); - } - } else { - return subdir + "/" + toString(buf.st_size); - } - } - void dump(ostream &stream) const { TransactionMap::const_iterator it; TransactionMap::const_iterator end = transactions.end(); - stream << "Number of clients: " << getClients().size() << "\n"; + stream << "Number of clients : " << getClients().size() << "\n"; + stream << "RemoteSender queue: " << remoteSender.queued() << " items\n"; stream << "Open transactions: " << transactions.size() << "\n"; for (it = transactions.begin(); it != end; it++) { const TransactionPtr &transaction = it->second; transaction->dump(stream); }