/** * Projects each element of an observable sequence into zero or more windows. * * @param {Mixed} windowOpeningsOrClosingSelector Observable sequence whose elements denote the creation of new windows, or, a function invoked to define the boundaries of the produced windows (a new window is started when the previous one is closed, resulting in non-overlapping windows). * @param {Function} [windowClosingSelector] A function invoked to define the closing of each produced window. If a closing selector function is specified for the first parameter, this parameter is ignored. * @returns {Observable} An observable sequence of windows. */ observableProto.window = function (windowOpeningsOrClosingSelector, windowClosingSelector) { if (arguments.length === 1 && typeof arguments[0] !== 'function') { return observableWindowWithBoundaries.call(this, windowOpeningsOrClosingSelector); } return typeof windowOpeningsOrClosingSelector === 'function' ? observableWindowWithClosingSelector.call(this, windowOpeningsOrClosingSelector) : observableWindowWithOpenings.call(this, windowOpeningsOrClosingSelector, windowClosingSelector); }; function observableWindowWithOpenings(windowOpenings, windowClosingSelector) { return windowOpenings.groupJoin(this, windowClosingSelector, observableEmpty, function (_, win) { return win; }); } function observableWindowWithBoundaries(windowBoundaries) { var source = this; return new AnonymousObservable(function (observer) { var win = new Subject(), d = new CompositeDisposable(), r = new RefCountDisposable(d); observer.onNext(addRef(win, r)); d.add(source.subscribe(function (x) { win.onNext(x); }, function (err) { win.onError(err); observer.onError(err); }, function () { win.onCompleted(); observer.onCompleted(); })); isPromise(windowBoundaries) && (windowBoundaries = observableFromPromise(windowBoundaries)); d.add(windowBoundaries.subscribe(function (w) { win.onCompleted(); win = new Subject(); observer.onNext(addRef(win, r)); }, function (err) { win.onError(err); observer.onError(err); }, function () { win.onCompleted(); observer.onCompleted(); })); return r; }, source); } function observableWindowWithClosingSelector(windowClosingSelector) { var source = this; return new AnonymousObservable(function (observer) { var m = new SerialDisposable(), d = new CompositeDisposable(m), r = new RefCountDisposable(d), win = new Subject(); observer.onNext(addRef(win, r)); d.add(source.subscribe(function (x) { win.onNext(x); }, function (err) { win.onError(err); observer.onError(err); }, function () { win.onCompleted(); observer.onCompleted(); })); function createWindowClose () { var windowClose; try { windowClose = windowClosingSelector(); } catch (e) { observer.onError(e); return; } isPromise(windowClose) && (windowClose = observableFromPromise(windowClose)); var m1 = new SingleAssignmentDisposable(); m.setDisposable(m1); m1.setDisposable(windowClose.take(1).subscribe(noop, function (err) { win.onError(err); observer.onError(err); }, function () { win.onCompleted(); win = new Subject(); observer.onNext(addRef(win, r)); createWindowClose(); })); } createWindowClose(); return r; }, source); }