Sha256: d2bc91c8ac8485e2cb9d1b57311e4faa25279ca2b27417835f8ea894afa93437
Contents?: true
Size: 1.67 KB
Versions: 14
Compression:
Stored size: 1.67 KB
Contents
/** * Skips elements for the specified duration from the end of the observable source sequence, using the specified scheduler to run timers. * * 1 - res = source.skipLastWithTime(5000); * 2 - res = source.skipLastWithTime(5000, scheduler); * * @description * This operator accumulates a queue with a length enough to store elements received during the initial duration window. * As more elements are received, elements older than the specified duration are taken from the queue and produced on the * result sequence. This causes elements to be delayed with duration. * @param {Number} duration Duration for skipping elements from the end of the sequence. * @param {Scheduler} [scheduler] Scheduler to run the timer on. If not specified, defaults to Rx.Scheduler.timeout * @returns {Observable} An observable sequence with the elements skipped during the specified duration from the end of the source sequence. */ observableProto.skipLastWithTime = function (duration, scheduler) { isScheduler(scheduler) || (scheduler = timeoutScheduler); var source = this; return new AnonymousObservable(function (o) { var q = []; return source.subscribe(function (x) { var now = scheduler.now(); q.push({ interval: now, value: x }); while (q.length > 0 && now - q[0].interval >= duration) { o.onNext(q.shift().value); } }, function (e) { o.onError(e); }, function () { var now = scheduler.now(); while (q.length > 0 && now - q[0].interval >= duration) { o.onNext(q.shift().value); } o.onCompleted(); }); }, source); };
Version data entries
14 entries across 7 versions & 1 rubygems