/** * Represents an object that is both an observable sequence as well as an observer. * Each notification is broadcasted to all subscribed observers. */ var Subject = Rx.Subject = (function (__super__) { function subscribe(observer) { checkDisposed(this); if (!this.isStopped) { this.observers.push(observer); return new InnerSubscription(this, observer); } if (this.hasError) { observer.onError(this.error); return disposableEmpty; } observer.onCompleted(); return disposableEmpty; } inherits(Subject, __super__); /** * Creates a subject. */ function Subject() { __super__.call(this, subscribe); this.isDisposed = false, this.isStopped = false, this.observers = []; this.hasError = false; } addProperties(Subject.prototype, Observer.prototype, { /** * Indicates whether the subject has observers subscribed to it. * @returns {Boolean} Indicates whether the subject has observers subscribed to it. */ hasObservers: function () { return this.observers.length > 0; }, /** * Notifies all subscribed observers about the end of the sequence. */ onCompleted: function () { checkDisposed(this); if (!this.isStopped) { this.isStopped = true; for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) { os[i].onCompleted(); } this.observers.length = 0; } }, /** * Notifies all subscribed observers about the exception. * @param {Mixed} error The exception to send to all observers. */ onError: function (error) { checkDisposed(this); if (!this.isStopped) { this.isStopped = true; this.error = error; this.hasError = true; for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) { os[i].onError(error); } this.observers.length = 0; } }, /** * Notifies all subscribed observers about the arrival of the specified element in the sequence. * @param {Mixed} value The value to send to all observers. */ onNext: function (value) { checkDisposed(this); if (!this.isStopped) { for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) { os[i].onNext(value); } } }, /** * Unsubscribe all observers and release resources. */ dispose: function () { this.isDisposed = true; this.observers = null; } }); /** * Creates a subject from the specified observer and observable. * @param {Observer} observer The observer used to send messages to the subject. * @param {Observable} observable The observable used to subscribe to messages sent from the subject. * @returns {Subject} Subject implemented using the given observer and observable. */ Subject.create = function (observer, observable) { return new AnonymousSubject(observer, observable); }; return Subject; }(Observable));