lib/ably/realtime/channel.rb in ably-0.7.2 vs lib/ably/realtime/channel.rb in ably-0.7.4

- old
+ new

@@ -31,10 +31,11 @@ class Channel include Ably::Modules::Conversions include Ably::Modules::EventEmitter include Ably::Modules::EventMachineHelpers include Ably::Modules::AsyncWrapper + include Ably::Modules::MessageEmitter extend Ably::Modules::Enum STATE = ruby_enum('STATE', :initialized, :attaching, @@ -82,18 +83,18 @@ ensure_utf_8 :name, name @client = client @name = name @options = channel_options.clone.freeze - @subscriptions = Hash.new { |hash, key| hash[key] = [] } @queue = [] @state_machine = ChannelStateMachine.new(self) @state = STATE(state_machine.current_state) @manager = ChannelManager.new(self, client.connection) setup_event_handlers + setup_presence end # Publish a message on the channel # # @param name [String] The event name of the message @@ -121,45 +122,37 @@ end end # Subscribe to messages matching providing event name, or all messages if event name not provided # - # @param name [String] The event name of the message to subscribe to if provided. Defaults to all events. + # @param names [String] The event name of the message to subscribe to if provided. Defaults to all events. # @yield [Ably::Models::Message] For each message received, the block is called # # @return [void] # - def subscribe(name = :all, &callback) + def subscribe(*names, &callback) attach unless attached? || attaching? - subscriptions[message_name_key(name)] << callback + super end # Unsubscribe the matching block for messages matching providing event name, or all messages if event name not provided. # If a block is not provided, all subscriptions will be unsubscribed # - # @param name [String] The event name of the message to subscribe to if provided. Defaults to all events. + # @param names [String] The event name of the message to subscribe to if provided. Defaults to all events. # # @return [void] # - def unsubscribe(name = :all, &callback) - if message_name_key(name) == :all - subscriptions.keys - else - Array(message_name_key(name)) - end.each do |key| - subscriptions[key].delete_if do |block| - !block_given? || callback == block - end - end + def unsubscribe(*names, &callback) + super end # Attach to this channel, and call the block if provided when attached. # Attaching to a channel is implicit in when a message is published or #subscribe is called, so it is uncommon # to need to call attach explicitly. # # @yield [Ably::Realtime::Channel] Block is called as soon as this channel is in the Attached state - # @return [EventMachine::Deferrable] Deferrable that supports both success (callback) and failure (errback) callback + # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callback # def attach(&success_block) transition_state_machine :attaching if can_transition_to?(:attaching) deferrable_for_state_change_to(STATE.Attached, &success_block) end @@ -181,21 +174,21 @@ # # @return {Ably::Realtime::Presence} # def presence attach if initialized? - @presence ||= Presence.new(self) + @presence end # Return the message history of the channel # # @param (see Ably::Rest::Channel#history) # @option options (see Ably::Rest::Channel#history) # # @yield [Ably::Models::PaginatedResource<Ably::Models::Message>] An Array of {Ably::Models::Message} objects that supports paging (#next_page, #first_page) # - # @return [EventMachine::Deferrable] + # @return [Ably::Util::SafeDeferrable] def history(options = {}, &callback) async_wrap(callback) do rest_channel.history(options.merge(async_blocking_operations: true)) end end @@ -212,25 +205,28 @@ # @api private def set_failed_channel_error_reason(error) @error_reason = error end + # @api private + def clear_error_reason + @error_reason = nil + end + # Used by {Ably::Modules::StateEmitter} to debug state changes # @api private def logger client.logger end private - attr_reader :queue, :subscriptions + attr_reader :queue def setup_event_handlers __incoming_msgbus__.subscribe(:message) do |message| message.decode self - - subscriptions[:all].each { |cb| cb.call(message) } - subscriptions[message.name].each { |cb| cb.call(message) } + emit_message message.name, message end on(STATE.Attached) do process_queue end @@ -271,11 +267,11 @@ def create_message(name, data) message = { name: name } message.merge!(data: data) unless data.nil? message.merge!(clientId: client.client_id) if client.client_id - Ably::Models::Message.new(message, nil).tap do |message| + Ably::Models::Message.new(message, logger: logger).tap do |message| message.encode self end end def rest_channel @@ -284,15 +280,14 @@ def connection client.connection end - def message_name_key(name) - if name == :all - :all - else - name.to_s - end + def setup_presence + @presence ||= Presence.new(self) end end end end + +require 'ably/realtime/channel/channel_manager' +require 'ably/realtime/channel/channel_state_machine'