Sha256: c9fefd285da4676f9d8b4a61fc29dc27131bdee28e7bde6fdccddb49d744d751
Contents?: true
Size: 1.01 KB
Versions: 14
Compression:
Stored size: 1.01 KB
Contents
/** * Executes a transducer to transform the observable sequence * @param {Transducer} transducer A transducer to execute * @returns {Observable} An Observable sequence containing the results from the transducer. */ observableProto.transduce = function(transducer) { var source = this; function transformForObserver(observer) { return { init: function() { return observer; }, step: function(obs, input) { return obs.onNext(input); }, result: function(obs) { return obs.onCompleted(); } }; } return new AnonymousObservable(function(observer) { var xform = transducer(transformForObserver(observer)); return source.subscribe( function(v) { try { xform.step(observer, v); } catch (e) { observer.onError(e); } }, observer.onError.bind(observer), function() { xform.result(observer); } ); }, source); };
Version data entries
14 entries across 7 versions & 1 rubygems