lib/riemann/dash/public/subs.js in riemann-dash-0.2.1 vs lib/riemann/dash/public/subs.js in riemann-dash-0.2.3

- old
+ new

@@ -1,6 +1,7 @@ var subs = (function() { + // What server shall we connect to by default? var server; // Subscription ID counter. var id_counter = -1; @@ -23,94 +24,108 @@ return id_counter += 1; } // Close a subscription's websocket channel. var close = function(sub) { - if (sub.ws == null) { - return sub; - } - sub.ws.close(); - sub.ws == null; - return sub; + return sub.close(); } // Closes a subscription and deletes it from the subscription manager. var unsubscribe = function(sub) { delete subs[sub.id]; - close(sub); + return sub.close(); } // Unsubscribe from all subscriptions. var unsubscribeAll = function() { _.each(subs, unsubscribe); } // Open a subscription's websocket channel. var open = function(sub) { - if (sub.ws != null && sub.ws.readyState != WebSocket.CLOSED) { - return sub; - } + return sub.open(); + } - var f = sub.f; - var queryString = "query=" + encodeURI(sub.query); - var uri = "ws://" + server + "/index?subscribe=true&" + queryString; - sub.ws = new WebSocket(uri); - var $ws = $(sub.ws); - - $ws.bind('open', function() { - console.log("Socket opened", sub.query); - }); + var Subscription = Backbone.Model.extend({ - $ws.bind('close', function(e) { - console.log("Socket closed", sub.query); - sub.ws = null; - }); + initialize: function(id, query, f) { + this.id = id; + this.query = query; + this.f = f; + }, - $ws.bind('error', function(e) { - console.log("Socket error", sub.query); - errorQueue.push(e); - ws.close(); - }); + isOpen: function() { + return this.ws && (this.ws.readyState != WebSocket.CLOSED) + }, + isClosed: function() { return !this.isOpen() }, - $ws.bind('message', function(e) { - t1 = Date.now(); - if (active) { - f(JSON.parse(e.originalEvent.data)); - } - load1(t1, Date.now()); - load5(t1, Date.now()); - }); + url: function() { + var queryString = "query=" + encodeURI(this.query); + return "ws://" + server + "/index?subscribe=true&" + queryString; + }, - return sub; - } + open: function() { + if (this.isOpen()) return this; + var ws = this.ws = new WebSocket(this.url()); + ws.onopen = _.bind(function() { + console.log("Socket opened", this.query); + }, this); + + ws.onclose = _.bind(function(e) { + console.log("Socket closed", this.query); + this.ws = null; + }, this); + + ws.onerror = _.bind(function(e) { + console.log("Socket error", this.query); + errorQueue.push(e); + this.close(); + }, this); + + ws.onmessage = _.bind(function(e) { + t1 = Date.now(); + if (active) { + this.f(JSON.parse(e.data)); + } + load1(t1, Date.now()); + load5(t1, Date.now()); + }, this); + + return this; + + }, + + close: function() { + if (this.ws) { + this.ws.close(); + this.ws = void 0; + } + return this; + } + }); + // Add a subscription. Returns a subscription object. Subscriptions are // opened immediately. var subscribe = function(query, f) { - var sub = { - id: newId(), - query: query, - f: f, - ws: null - } + var sub = new Subscription(newId(), query, f).open(); subs[sub.id] = sub; - open(sub); return sub; } // Reconnect all inactive subs. var converge = function() { var closed = _.filter(subs, function(sub) { - return (sub.ws == null || sub.ws.readyState == WebSocket.CLOSED); + return sub.isClosed(); }); if (_.isEmpty(closed)) { // Done here. return; } // Display reconnection notice - toastr.warning(_.size(closed) + " lost connections"); + toastr.warning(_.size(closed) + " lost connections—check the server field above."); // Reopen _.each(closed, function(sub) { open(sub); }); @@ -118,10 +133,10 @@ var notifyErrors = function() { if (errorQueue.length == 0) { return; } - _.warning(errorQueue.length + " socket errors"); + toastr.warning(errorQueue.length + " socket errors"); errorQueue.length = 0; converge(); } // Periodically notify of errors.