lib/riemann/dash/public/subs.js in riemann-dash-0.2.7 vs lib/riemann/dash/public/subs.js in riemann-dash-0.2.8
- old
+ new
@@ -9,11 +9,11 @@
// Subscription ID counter.
var id_counter = -1;
// Subscriptions
var subs = {};
-
+
// Switch to turn on/off event processing
var active = true;
// Error queue for notification
var errorQueue = [];
@@ -56,10 +56,11 @@
bottom = prioqueue.bottomPriority();
if (bottom) {
expiry = new Date(bottom);
if (expiry < now) {
expired = prioqueue.shift();
+ expired.time = expiry;
expired.state = 'expired';
sub.f(expired);
}
}
}
@@ -75,13 +76,13 @@
this.clockSub = false;
},
isOpen: function() {
if (server_type == "ws") {
- return this.ws && (this.ws.readyState != EventSource.CLOSED)
- } else {
return this.ws && (this.ws.readyState != WebSocket.CLOSED)
+ } else {
+ return this.ws && (this.ws.readyState != EventSource.CLOSED)
}
},
isClosed: function() { return !this.isOpen() },
url: function() {
@@ -96,12 +97,13 @@
}
},
open: function() {
if (this.isOpen()) return this;
- console.log("will open url: " + this.url());
+ console.log("will open url: " + this.url());
+
var ws;
if (server_type == "sse") {
ws = this.ws = new EventSource(this.url());
} else {
ws = this.ws = new WebSocket(this.url());
@@ -126,25 +128,28 @@
t1 = Date.now();
if (active) {
var event = JSON.parse(e.data);
event.time = Date.parse(event.time);
clock.advance(event.time);
- if (event.ttl > 0) { // only expired events have no TTL
+
+ // Update local index.
+ if (event.state !== "expired") { // only expired events have no TTL
+ // TODO: get a prioqueue supporting delete so we can delete expired
+ // events.
this.prioqueue.update(
{host: event.host, service: event.service},
- event.time + (event.ttl * 1000) // convert TTL to ms
+ event.time + ((event.ttl || 60) * 1000) // convert TTL to ms
);
+ this.f(event);
}
- this.f(event);
}
var t2 = Date.now();
load1(t1, t2);
load5(t1, t2);
}, this);
return this;
-
},
close: function() {
if (this.ws) {
this.ws.close();
@@ -174,13 +179,10 @@
});
if (_.isEmpty(closed)) {
// Done here.
return;
}
-
- // Display reconnection notice
- toastr.warning(_.size(closed) + " lost connections—check the server field above.");
// Reopen
_.each(closed, function(sub) {
open(sub);
});
@@ -188,13 +190,12 @@
var notifyErrors = function() {
if (errorQueue.length == 0) {
return;
}
- toastr.warning(errorQueue.length + " socket errors");
+ toastr.warning(errorQueue.length + " socket errors; check the server field above.");
errorQueue.length = 0;
- converge();
}
// Periodically notify of errors.
window.setInterval(notifyErrors, 100);
@@ -214,10 +215,10 @@
subs: function() { return subs; },
enable: function() { active = true; console.log("Subs enabled."); },
disable: function() { active = false; console.log("Subs disabled."); },
toggle: function() {
active = ! active;
- if (active) {
+ if (active) {
console.log("Subs enabled.");
} else {
console.log("Subs disabled.");
}
},