Sha256: 3e2304a8d5ee9c60623a82a78a52aebcdfe77337985f7362ec68bea688fb299e
Contents?: true
Size: 1.61 KB
Versions: 14
Compression:
Stored size: 1.61 KB
Contents
/** * Returns elements within the specified duration from the end of the observable source sequence, using the specified schedulers to run timers and to drain the collected elements. * @description * This operator accumulates a queue with a length enough to store elements received during the initial duration window. * As more elements are received, elements older than the specified duration are taken from the queue and produced on the * result sequence. This causes elements to be delayed with duration. * @param {Number} duration Duration for taking elements from the end of the sequence. * @param {Scheduler} [scheduler] Scheduler to run the timer on. If not specified, defaults to Rx.Scheduler.timeout. * @returns {Observable} An observable sequence with the elements taken during the specified duration from the end of the source sequence. */ observableProto.takeLastWithTime = function (duration, scheduler) { var source = this; isScheduler(scheduler) || (scheduler = timeoutScheduler); return new AnonymousObservable(function (o) { var q = []; return source.subscribe(function (x) { var now = scheduler.now(); q.push({ interval: now, value: x }); while (q.length > 0 && now - q[0].interval >= duration) { q.shift(); } }, function (e) { o.onError(e); }, function () { var now = scheduler.now(); while (q.length > 0) { var next = q.shift(); if (now - next.interval <= duration) { o.onNext(next.value); } } o.onCompleted(); }); }, source); };
Version data entries
14 entries across 7 versions & 1 rubygems