lib/riemann/dash/public/subs.js in riemann-dash-0.2.1 vs lib/riemann/dash/public/subs.js in riemann-dash-0.2.3
- old
+ new
@@ -1,6 +1,7 @@
var subs = (function() {
+
// What server shall we connect to by default?
var server;
// Subscription ID counter.
var id_counter = -1;
@@ -23,94 +24,108 @@
return id_counter += 1;
}
// Close a subscription's websocket channel.
var close = function(sub) {
- if (sub.ws == null) {
- return sub;
- }
- sub.ws.close();
- sub.ws == null;
- return sub;
+ return sub.close();
}
// Closes a subscription and deletes it from the subscription manager.
var unsubscribe = function(sub) {
delete subs[sub.id];
- close(sub);
+ return sub.close();
}
// Unsubscribe from all subscriptions.
var unsubscribeAll = function() {
_.each(subs, unsubscribe);
}
// Open a subscription's websocket channel.
var open = function(sub) {
- if (sub.ws != null && sub.ws.readyState != WebSocket.CLOSED) {
- return sub;
- }
+ return sub.open();
+ }
- var f = sub.f;
- var queryString = "query=" + encodeURI(sub.query);
- var uri = "ws://" + server + "/index?subscribe=true&" + queryString;
- sub.ws = new WebSocket(uri);
- var $ws = $(sub.ws);
-
- $ws.bind('open', function() {
- console.log("Socket opened", sub.query);
- });
+ var Subscription = Backbone.Model.extend({
- $ws.bind('close', function(e) {
- console.log("Socket closed", sub.query);
- sub.ws = null;
- });
+ initialize: function(id, query, f) {
+ this.id = id;
+ this.query = query;
+ this.f = f;
+ },
- $ws.bind('error', function(e) {
- console.log("Socket error", sub.query);
- errorQueue.push(e);
- ws.close();
- });
+ isOpen: function() {
+ return this.ws && (this.ws.readyState != WebSocket.CLOSED)
+ },
+ isClosed: function() { return !this.isOpen() },
- $ws.bind('message', function(e) {
- t1 = Date.now();
- if (active) {
- f(JSON.parse(e.originalEvent.data));
- }
- load1(t1, Date.now());
- load5(t1, Date.now());
- });
+ url: function() {
+ var queryString = "query=" + encodeURI(this.query);
+ return "ws://" + server + "/index?subscribe=true&" + queryString;
+ },
- return sub;
- }
+ open: function() {
+ if (this.isOpen()) return this;
+ var ws = this.ws = new WebSocket(this.url());
+ ws.onopen = _.bind(function() {
+ console.log("Socket opened", this.query);
+ }, this);
+
+ ws.onclose = _.bind(function(e) {
+ console.log("Socket closed", this.query);
+ this.ws = null;
+ }, this);
+
+ ws.onerror = _.bind(function(e) {
+ console.log("Socket error", this.query);
+ errorQueue.push(e);
+ this.close();
+ }, this);
+
+ ws.onmessage = _.bind(function(e) {
+ t1 = Date.now();
+ if (active) {
+ this.f(JSON.parse(e.data));
+ }
+ load1(t1, Date.now());
+ load5(t1, Date.now());
+ }, this);
+
+ return this;
+
+ },
+
+ close: function() {
+ if (this.ws) {
+ this.ws.close();
+ this.ws = void 0;
+ }
+ return this;
+ }
+ });
+
// Add a subscription. Returns a subscription object. Subscriptions are
// opened immediately.
var subscribe = function(query, f) {
- var sub = {
- id: newId(),
- query: query,
- f: f,
- ws: null
- }
+ var sub = new Subscription(newId(), query, f).open();
subs[sub.id] = sub;
- open(sub);
return sub;
}
// Reconnect all inactive subs.
var converge = function() {
var closed = _.filter(subs, function(sub) {
- return (sub.ws == null || sub.ws.readyState == WebSocket.CLOSED);
+ return sub.isClosed();
});
if (_.isEmpty(closed)) {
// Done here.
return;
}
// Display reconnection notice
- toastr.warning(_.size(closed) + " lost connections");
+ toastr.warning(_.size(closed) + " lost connections—check the server field above.");
// Reopen
_.each(closed, function(sub) {
open(sub);
});
@@ -118,10 +133,10 @@
var notifyErrors = function() {
if (errorQueue.length == 0) {
return;
}
- _.warning(errorQueue.length + " socket errors");
+ toastr.warning(errorQueue.length + " socket errors");
errorQueue.length = 0;
converge();
}
// Periodically notify of errors.