Sha256: 430157d415210e4f9306de69ec637d84f76cb8e152806c7f500b8e255879af88
Contents?: true
Size: 1.58 KB
Versions: 14
Compression:
Stored size: 1.58 KB
Contents
/* * Performs a exclusive waiting for the first to finish before subscribing to another observable. * Observables that come in between subscriptions will be dropped on the floor. * @returns {Observable} A exclusive observable with only the results that happen when subscribed. */ observableProto.exclusive = function () { var sources = this; return new AnonymousObservable(function (observer) { var hasCurrent = false, isStopped = false, m = new SingleAssignmentDisposable(), g = new CompositeDisposable(); g.add(m); m.setDisposable(sources.subscribe( function (innerSource) { if (!hasCurrent) { hasCurrent = true; isPromise(innerSource) && (innerSource = observableFromPromise(innerSource)); var innerSubscription = new SingleAssignmentDisposable(); g.add(innerSubscription); innerSubscription.setDisposable(innerSource.subscribe( observer.onNext.bind(observer), observer.onError.bind(observer), function () { g.remove(innerSubscription); hasCurrent = false; if (isStopped && g.length === 1) { observer.onCompleted(); } })); } }, observer.onError.bind(observer), function () { isStopped = true; if (!hasCurrent && g.length === 1) { observer.onCompleted(); } })); return g; }, this); };
Version data entries
14 entries across 7 versions & 1 rubygems