ext/common/LoggingAgent/LoggingServer.h in passenger-3.0.6 vs ext/common/LoggingAgent/LoggingServer.h in passenger-3.0.7
- old
+ new
@@ -42,11 +42,10 @@
#include <ctime>
#include <cassert>
#include "DataStoreId.h"
#include "RemoteSender.h"
-#include "ChangeNotifier.h"
#include "FilterSupport.h"
#include "../EventedMessageServer.h"
#include "../MessageReadersWriters.h"
#include "../StaticString.h"
#include "../Exceptions.h"
@@ -119,11 +118,11 @@
return false;
}
virtual void append(const DataStoreId &dataStoreId,
const StaticString &data) = 0;
- virtual void flush() { }
+ virtual bool flush() { return true; }
virtual void dump(ostream &stream) const { }
};
struct LogFile: public LogSink {
static const unsigned int BUFFER_CAPACITY = 8 * 1024;
@@ -131,16 +130,10 @@
string filename;
FileDescriptor fd;
char buffer[BUFFER_CAPACITY];
unsigned int bufferSize;
- /**
- * Contains every (groupName, nodeName, category) tuple for
- * which their data is currently buffered in this sink.
- */
- set<DataStoreId> dataStoreIds;
-
LogFile(LoggingServer *server, const string &filename, mode_t filePermissions)
: LogSink(server)
{
int ret;
@@ -161,47 +154,33 @@
virtual ~LogFile() {
flush();
}
- void notifyChanges() {
- if (server->changeNotifier != NULL) {
- set<DataStoreId>::const_iterator it;
- set<DataStoreId>::const_iterator end = dataStoreIds.end();
-
- for (it = dataStoreIds.begin(); it != dataStoreIds.end(); it++) {
- server->changeNotifier->changed(*it);
- }
- }
- dataStoreIds.clear();
- }
-
virtual void append(const DataStoreId &dataStoreId, const StaticString &data) {
- if (server->changeNotifier != NULL) {
- dataStoreIds.insert(dataStoreId);
- }
if (bufferSize + data.size() > BUFFER_CAPACITY) {
StaticString data2[2];
data2[0] = StaticString(buffer, bufferSize);
data2[1] = data;
gatheredWrite(fd, data2, 2);
lastFlushed = ev_now(server->getLoop());
bufferSize = 0;
- notifyChanges();
} else {
memcpy(buffer + bufferSize, data.data(), data.size());
bufferSize += data.size();
}
}
- virtual void flush() {
+ virtual bool flush() {
if (bufferSize > 0) {
lastFlushed = ev_now(server->getLoop());
writeExact(fd, buffer, bufferSize);
bufferSize = 0;
- notifyChanges();
+ return true;
+ } else {
+ return false;
}
}
virtual void dump(ostream &stream) const {
stream << " Log file: file=" << filename << ", "
@@ -268,17 +247,20 @@
memcpy(buffer + bufferSize, data.data(), data.size());
bufferSize += data.size();
}
}
- virtual void flush() {
+ virtual bool flush() {
if (bufferSize > 0) {
lastFlushed = ev_now(server->getLoop());
StaticString data(buffer, bufferSize);
server->remoteSender.schedule(unionStationKey, nodeName,
category, &data, 1);
bufferSize = 0;
+ return true;
+ } else {
+ return false;
}
}
virtual void dump(ostream &stream) const {
stream << " Remote sink: "
@@ -374,12 +356,11 @@
typedef shared_ptr<Transaction> TransactionPtr;
enum ClientType {
UNINITIALIZED,
- LOGGER,
- WATCHER
+ LOGGER
};
struct Client: public EventedMessageClient {
string nodeName;
ClientType type;
@@ -409,11 +390,10 @@
string dir;
gid_t gid;
string dirPermissions;
mode_t filePermissions;
RemoteSender remoteSender;
- ChangeNotifierPtr changeNotifier;
ev::timer garbageCollectionTimer;
ev::timer sinkFlushingTimer;
ev::timer exitTimer;
TransactionMap transactions;
LogSinkCache logSinkCache;
@@ -818,62 +798,10 @@
current++;
}
return true;
}
- bool getLastEntryInDirectory(const string &path, string &result) const {
- DIR *dir = opendir(path.c_str());
- struct dirent *entry;
- vector<string> subdirs;
-
- if (dir == NULL) {
- int e = errno;
- throw FileSystemException("Cannot open directory " + path,
- e, path);
- }
- while ((entry = readdir(dir)) != NULL) {
- if (isDirectory(path, entry) && looksLikeNumber(entry->d_name)) {
- subdirs.push_back(entry->d_name);
- }
- }
- closedir(dir);
-
- if (subdirs.empty()) {
- return false;
- }
-
- vector<string>::const_iterator it = subdirs.begin();
- vector<string>::const_iterator end = subdirs.end();
- vector<string>::const_iterator largest_it = subdirs.begin();
- int largest = atoi(subdirs[0]);
- for (it++; it != end; it++) {
- const string &subdir = *it;
- int number = atoi(subdir.c_str());
- if (number > largest) {
- largest_it = it;
- largest = number;
- }
- }
- result = *largest_it;
- return true;
- }
-
- static void pendingDataFlushed(EventedClient *_client) {
- Client *client = (Client *) _client;
- LoggingServer *self = (LoggingServer *) client->userData;
-
- client->onPendingDataFlushed = NULL;
- if (OXT_UNLIKELY( client->type != WATCHER )) {
- P_WARN("BUG: pendingDataFlushed() called even though client type is not WATCHER.");
- client->disconnect();
- } else if (self->changeNotifier != NULL) {
- self->changeNotifier->addClient(client->detach());
- } else {
- client->disconnect();
- }
- }
-
/* Release all inactive log sinks that have been inactive for more than
* GARBAGE_COLLECTION_TIMEOUT seconds.
*/
void releaseInactiveLogSinks(ev_tstamp now) {
bool done = false;
@@ -1158,30 +1086,10 @@
toHex(StaticString((const char *) digest, MD5_SIZE), client->nodeId);
client->type = LOGGER;
client->writeArrayMessage("ok", NULL);
- } else if (args[0] == "watchChanges") {
- if (OXT_UNLIKELY( !checkWhetherConnectionAreAcceptable(client) )) {
- return true;
- }
- if (OXT_UNLIKELY( client->type != UNINITIALIZED )) {
- sendErrorToClient(client, "This command cannot be invoked "
- "if the 'init' command is already invoked.");
- client->disconnect();
- return true;
- }
-
- client->type = WATCHER;
- client->notifyReads(false);
- discardReadData();
-
- // Add to the change notifier after all pending data
- // has been written out.
- client->onPendingDataFlushed = pendingDataFlushed;
- client->writeArrayMessage("ok", NULL);
-
} else if (args[0] == "flush") {
flushAllSinks();
client->writeArrayMessage("ok", NULL);
} else if (args[0] == "info") {
@@ -1332,82 +1240,21 @@
transaction->discard();
}
}
// Invoke destructors, causing all transactions and log sinks to
- // be flushed before RemoteSender and ChangeNotifier are being
- // destroyed.
+ // be flushed before RemoteSender is being destroyed.
transactions.clear();
logSinkCache.clear();
inactiveLogSinks.clear();
}
- void setChangeNotifier(const ChangeNotifierPtr &_changeNotifier) {
- changeNotifier = _changeNotifier;
- changeNotifier->getLastPos = boost::bind(&LoggingServer::getLastPos,
- this, _1, _2, _3);
- }
-
- string getLastPos(const StaticString &groupName, const StaticString &nodeName,
- const StaticString &category) const
- {
- md5_state_t state;
- md5_byte_t digest[MD5_SIZE];
- char nodeId[MD5_HEX_SIZE];
- md5_init(&state);
- md5_append(&state, (const md5_byte_t *) nodeName.data(), nodeName.size());
- md5_finish(&state, digest);
- toHex(StaticString((const char *) digest, MD5_SIZE), nodeId);
-
- string dir = determineFilename(groupName, nodeId, category);
- string subdir, component;
- subdir.reserve(13); // It's a string that looks like: "2010/06/24/12"
-
- try {
- // Loop 4 times to process year, month, day, hour.
- for (int i = 0; i < 4; i++) {
- bool found = getLastEntryInDirectory(dir, component);
- if (!found) {
- return string();
- }
- dir.append("/");
- dir.append(component);
- if (i != 0) {
- subdir.append("/");
- }
- subdir.append(component);
- }
- // After the loop, new dir == old dir + "/" + subdir
- } catch (const SystemException &e) {
- if (e.code() == ENOENT) {
- return string();
- } else {
- throw;
- }
- }
-
- string &filename = dir;
- filename.append("/log.txt");
-
- struct stat buf;
- if (stat(filename.c_str(), &buf) == -1) {
- if (errno == ENOENT) {
- return string();
- } else {
- int e = errno;
- throw FileSystemException("Cannot stat() " + filename, e,
- filename);
- }
- } else {
- return subdir + "/" + toString(buf.st_size);
- }
- }
-
void dump(ostream &stream) const {
TransactionMap::const_iterator it;
TransactionMap::const_iterator end = transactions.end();
- stream << "Number of clients: " << getClients().size() << "\n";
+ stream << "Number of clients : " << getClients().size() << "\n";
+ stream << "RemoteSender queue: " << remoteSender.queued() << " items\n";
stream << "Open transactions: " << transactions.size() << "\n";
for (it = transactions.begin(); it != end; it++) {
const TransactionPtr &transaction = it->second;
transaction->dump(stream);
}