Sha256: 0ae389def8d20c10b294ac951238314a394be2ec03c540f895d5a4c41f95ea5b
Contents?: true
Size: 1.6 KB
Versions: 14
Compression:
Stored size: 1.6 KB
Contents
function sampleObservable(source, sampler) { return new AnonymousObservable(function (observer) { var atEnd, value, hasValue; function sampleSubscribe() { if (hasValue) { hasValue = false; observer.onNext(value); } atEnd && observer.onCompleted(); } return new CompositeDisposable( source.subscribe(function (newValue) { hasValue = true; value = newValue; }, observer.onError.bind(observer), function () { atEnd = true; }), sampler.subscribe(sampleSubscribe, observer.onError.bind(observer), sampleSubscribe) ); }, source); } /** * Samples the observable sequence at each interval. * * @example * 1 - res = source.sample(sampleObservable); // Sampler tick sequence * 2 - res = source.sample(5000); // 5 seconds * 2 - res = source.sample(5000, Rx.Scheduler.timeout); // 5 seconds * * @param {Mixed} intervalOrSampler Interval at which to sample (specified as an integer denoting milliseconds) or Sampler Observable. * @param {Scheduler} [scheduler] Scheduler to run the sampling timer on. If not specified, the timeout scheduler is used. * @returns {Observable} Sampled observable sequence. */ observableProto.sample = observableProto.throttleLatest = function (intervalOrSampler, scheduler) { isScheduler(scheduler) || (scheduler = timeoutScheduler); return typeof intervalOrSampler === 'number' ? sampleObservable(this, observableinterval(intervalOrSampler, scheduler)) : sampleObservable(this, intervalOrSampler); };
Version data entries
14 entries across 7 versions & 1 rubygems