Sha256: a4a19ba34effa38a8978100381a66b2a3cd63944f1878deabcefab2216b29eae
Contents?: true
Size: 1.82 KB
Versions: 14
Compression:
Stored size: 1.82 KB
Contents
/** * Comonadic bind operator. * @param {Function} selector A transform function to apply to each element. * @param {Object} scheduler Scheduler used to execute the operation. If not specified, defaults to the ImmediateScheduler. * @returns {Observable} An observable sequence which results from the comonadic bind operation. */ observableProto.manySelect = function (selector, scheduler) { isScheduler(scheduler) || (scheduler = immediateScheduler); var source = this; return observableDefer(function () { var chain; return source .map(function (x) { var curr = new ChainObservable(x); chain && chain.onNext(x); chain = curr; return curr; }) .tap( noop, function (e) { chain && chain.onError(e); }, function () { chain && chain.onCompleted(); } ) .observeOn(scheduler) .map(selector); }, source); }; var ChainObservable = (function (__super__) { function subscribe (observer) { var self = this, g = new CompositeDisposable(); g.add(currentThreadScheduler.schedule(function () { observer.onNext(self.head); g.add(self.tail.mergeAll().subscribe(observer)); })); return g; } inherits(ChainObservable, __super__); function ChainObservable(head) { __super__.call(this, subscribe); this.head = head; this.tail = new AsyncSubject(); } addProperties(ChainObservable.prototype, Observer, { onCompleted: function () { this.onNext(Observable.empty()); }, onError: function (e) { this.onNext(Observable.throwError(e)); }, onNext: function (v) { this.tail.onNext(v); this.tail.onCompleted(); } }); return ChainObservable; }(Observable));
Version data entries
14 entries across 7 versions & 1 rubygems