/** * Returns the source observable sequence, switching to the other observable sequence if a timeout is signaled. * @param {Observable} [firstTimeout] Observable sequence that represents the timeout for the first element. If not provided, this defaults to Observable.never(). * @param {Function} timeoutDurationSelector Selector to retrieve an observable sequence that represents the timeout between the current element and the next element. * @param {Observable} [other] Sequence to return in case of a timeout. If not provided, this is set to Observable.throwException(). * @returns {Observable} The source sequence switching to the other sequence in case of a timeout. */ observableProto.timeoutWithSelector = function (firstTimeout, timeoutdurationSelector, other) { if (arguments.length === 1) { timeoutdurationSelector = firstTimeout; firstTimeout = observableNever(); } other || (other = observableThrow(new Error('Timeout'))); var source = this; return new AnonymousObservable(function (observer) { var subscription = new SerialDisposable(), timer = new SerialDisposable(), original = new SingleAssignmentDisposable(); subscription.setDisposable(original); var id = 0, switched = false; function setTimer(timeout) { var myId = id; function timerWins () { return id === myId; } var d = new SingleAssignmentDisposable(); timer.setDisposable(d); d.setDisposable(timeout.subscribe(function () { timerWins() && subscription.setDisposable(other.subscribe(observer)); d.dispose(); }, function (e) { timerWins() && observer.onError(e); }, function () { timerWins() && subscription.setDisposable(other.subscribe(observer)); })); }; setTimer(firstTimeout); function observerWins() { var res = !switched; if (res) { id++; } return res; } original.setDisposable(source.subscribe(function (x) { if (observerWins()) { observer.onNext(x); var timeout; try { timeout = timeoutdurationSelector(x); } catch (e) { observer.onError(e); return; } setTimer(isPromise(timeout) ? observableFromPromise(timeout) : timeout); } }, function (e) { observerWins() && observer.onError(e); }, function () { observerWins() && observer.onCompleted(); })); return new CompositeDisposable(subscription, timer); }, source); };