Sha256: e9b9980f0ad907d0b6e88a8c8d22ade52f943d79a4036d15f8ee821af0928aac

Contents?: true

Size: 1.44 KB

Versions: 29

Compression:

Stored size: 1.44 KB

Contents

'use strict';

var Cell   = require('./cell'),
    Pledge = require('./pledge');

var Pipeline = function(sessions) {
  this._cells   = sessions.map(function(session) { return new Cell(session) });
  this._stopped = {incoming: false, outgoing: false};
};

Pipeline.prototype.processIncomingMessage = function(message, callback, context) {
  if (this._stopped.incoming) return;
  this._loop('incoming', this._cells.length - 1, -1, -1, message, callback, context);
};

Pipeline.prototype.processOutgoingMessage = function(message, callback, context) {
  if (this._stopped.outgoing) return;
  this._loop('outgoing', 0, this._cells.length, 1, message, callback, context);
};

Pipeline.prototype.close = function(callback, context) {
  this._stopped = {incoming: true, outgoing: true};

  var closed = this._cells.map(function(a) { return a.close() });
  if (callback)
    Pledge.all(closed).then(function() { callback.call(context) });
};

Pipeline.prototype._loop = function(direction, start, end, step, message, callback, context) {
  var cells = this._cells,
      n     = cells.length,
      self  = this;

  while (n--) cells[n].pending(direction);

  var pipe = function(index, error, msg) {
    if (index === end) return callback.call(context, error, msg);

    cells[index][direction](error, msg, function(err, m) {
      if (err) self._stopped[direction] = true;
      pipe(index + step, err, m);
    });
  };
  pipe(start, null, message);
};

module.exports = Pipeline;

Version data entries

29 entries across 29 versions & 11 rubygems

Version Path
disco_app-0.18.0 test/dummy/node_modules/websocket-extensions/lib/pipeline/index.js
disco_app-0.18.2 test/dummy/node_modules/websocket-extensions/lib/pipeline/index.js
disco_app-0.16.1 test/dummy/node_modules/websocket-extensions/lib/pipeline/index.js
disco_app-0.15.2 test/dummy/node_modules/websocket-extensions/lib/pipeline/index.js
disco_app-0.18.4 test/dummy/node_modules/websocket-extensions/lib/pipeline/index.js
disco_app-0.18.1 test/dummy/node_modules/websocket-extensions/lib/pipeline/index.js
disco_app-0.12.7.pre.puma.pre.3 test/dummy/node_modules/websocket-extensions/lib/pipeline/index.js
disco_app-0.14.0 test/dummy/node_modules/websocket-extensions/lib/pipeline/index.js
disco_app-0.13.6.pre.puma.pre.3 test/dummy/node_modules/websocket-extensions/lib/pipeline/index.js
enju_library-0.3.8 spec/dummy/node_modules/websocket-extensions/lib/pipeline/index.js
ilog-0.4.1 node_modules/websocket-extensions/lib/pipeline/index.js
ilog-0.4.0 node_modules/websocket-extensions/lib/pipeline/index.js
ilog-0.3.3 node_modules/websocket-extensions/lib/pipeline/index.js
jester-data-8.0.0 node_modules/websocket-extensions/lib/pipeline/index.js
ezii-os-5.2.1 node_modules/websocket-extensions/lib/pipeline/index.js
ezii-os-2.0.1 node_modules/websocket-extensions/lib/pipeline/index.js
ezii-os-1.1.0 node_modules/websocket-extensions/lib/pipeline/index.js
ezii-os-1.0.0 node_modules/websocket-extensions/lib/pipeline/index.js
ezii-os-0.0.0.1.0 node_modules/websocket-extensions/lib/pipeline/index.js
ezii-os-0.0.0.0.1 node_modules/websocket-extensions/lib/pipeline/index.js