Sha256: 51063935bba2665674de4f9b78a0a296335b1f9b15f0bafeaa0a59f7ad184436
Contents?: true
Size: 1.65 KB
Versions: 14
Compression:
Stored size: 1.65 KB
Contents
/** * Returns an array with the elements within the specified duration from the end of the observable source sequence, using the specified scheduler to run timers. * @description * This operator accumulates a queue with a length enough to store elements received during the initial duration window. * As more elements are received, elements older than the specified duration are taken from the queue and produced on the * result sequence. This causes elements to be delayed with duration. * @param {Number} duration Duration for taking elements from the end of the sequence. * @param {Scheduler} scheduler Scheduler to run the timer on. If not specified, defaults to Rx.Scheduler.timeout. * @returns {Observable} An observable sequence containing a single array with the elements taken during the specified duration from the end of the source sequence. */ observableProto.takeLastBufferWithTime = function (duration, scheduler) { var source = this; isScheduler(scheduler) || (scheduler = timeoutScheduler); return new AnonymousObservable(function (o) { var q = []; return source.subscribe(function (x) { var now = scheduler.now(); q.push({ interval: now, value: x }); while (q.length > 0 && now - q[0].interval >= duration) { q.shift(); } }, function (e) { o.onError(e); }, function () { var now = scheduler.now(), res = []; while (q.length > 0) { var next = q.shift(); now - next.interval <= duration && res.push(next.value); } o.onNext(res); o.onCompleted(); }); }, source); };
Version data entries
14 entries across 7 versions & 1 rubygems