import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { ArgumentOutOfRangeError } from '../util/ArgumentOutOfRangeError'; import { EmptyObservable } from '../observable/EmptyObservable'; import { Observable } from '../Observable'; import { TeardownLogic } from '../Subscription'; import { MonoTypeOperatorFunction } from '../interfaces'; /** * Emits only the last `count` values emitted by the source Observable. * * Remembers the latest `count` values, then emits those * only when the source completes. * * * * `takeLast` returns an Observable that emits at most the last `count` values * emitted by the source Observable. If the source emits fewer than `count` * values then all of its values are emitted. This operator must wait until the * `complete` notification emission from the source in order to emit the `next` * values on the output Observable, because otherwise it is impossible to know * whether or not more values will be emitted on the source. For this reason, * all values are emitted synchronously, followed by the complete notification. * * @example Take the last 3 values of an Observable with many values * var many = Rx.Observable.range(1, 100); * var lastThree = many.takeLast(3); * lastThree.subscribe(x => console.log(x)); * * @see {@link take} * @see {@link takeUntil} * @see {@link takeWhile} * @see {@link skip} * * @throws {ArgumentOutOfRangeError} When using `takeLast(i)`, it delivers an * ArgumentOutOrRangeError to the Observer's `error` callback if `i < 0`. * * @param {number} count The maximum number of values to emit from the end of * the sequence of values emitted by the source Observable. * @return {Observable} An Observable that emits at most the last count * values emitted by the source Observable. * @method takeLast * @owner Observable */ export function takeLast(count: number): MonoTypeOperatorFunction { return function takeLastOperatorFunction(source: Observable): Observable { if (count === 0) { return new EmptyObservable(); } else { return source.lift(new TakeLastOperator(count)); } }; } class TakeLastOperator implements Operator { constructor(private total: number) { if (this.total < 0) { throw new ArgumentOutOfRangeError; } } call(subscriber: Subscriber, source: any): TeardownLogic { return source.subscribe(new TakeLastSubscriber(subscriber, this.total)); } } /** * We need this JSDoc comment for affecting ESDoc. * @ignore * @extends {Ignored} */ class TakeLastSubscriber extends Subscriber { private ring: Array = new Array(); private count: number = 0; constructor(destination: Subscriber, private total: number) { super(destination); } protected _next(value: T): void { const ring = this.ring; const total = this.total; const count = this.count++; if (ring.length < total) { ring.push(value); } else { const index = count % total; ring[index] = value; } } protected _complete(): void { const destination = this.destination; let count = this.count; if (count > 0) { const total = this.count >= this.total ? this.total : this.count; const ring = this.ring; for (let i = 0; i < total; i++) { const idx = (count++) % total; destination.next(ring[idx]); } } destination.complete(); } }