lib/ably/realtime/channel.rb in ably-0.6.2 vs lib/ably/realtime/channel.rb in ably-0.7.0

- old
+ new

@@ -21,18 +21,14 @@ # Channel::STATE.Attached # Channel::STATE.Detaching # Channel::STATE.Detached # Channel::STATE.Failed # + # Channels emit errors - use `on(:error)` to subscribe to errors + # # @!attribute [r] state # @return {Ably::Realtime::Connection::STATE} channel state - # @!attribute [r] client - # @return {Ably::Realtime::Client} Ably client associated with this channel - # @!attribute [r] name - # @return {String} channel name - # @!attribute [r] options - # @return {Hash} channel options configured for this channel, see {#initialize} for channel_options # class Channel include Ably::Modules::Conversions include Ably::Modules::EventEmitter include Ably::Modules::EventMachineHelpers @@ -46,32 +42,57 @@ :detaching, :detached, :failed ) include Ably::Modules::StateEmitter + include Ably::Modules::UsesStateMachine # Max number of messages to bundle in a single ProtocolMessage MAX_PROTOCOL_MESSAGE_BATCH_SIZE = 50 - attr_reader :client, :name, :options + # {Ably::Realtime::Client} associated with this channel + # @return [Ably::Realtime::Client] + attr_reader :client + # Channel name + # @return [String] + attr_reader :name + + # Channel options configured for this channel, see {#initialize} for channel_options + # @return [Hash] + attr_reader :options + + # When a channel failure occurs this attribute contains the Ably Exception + # @return [Ably::Models::ErrorInfo,Ably::Exceptions::BaseAblyException] + attr_reader :error_reason + + # The Channel manager responsible for attaching, detaching and handling failures for this channel + # @return [Ably::Realtime::Channel::ChannelManager] + # @api private + attr_reader :manager + # Initialize a new Channel object # # @param client [Ably::Rest::Client] # @param name [String] The name of the channel # @param channel_options [Hash] Channel options, currently reserved for Encryption options # @option channel_options [Boolean] :encrypted setting this to true for this channel will encrypt & decrypt all messages automatically # @option channel_options [Hash] :cipher_params A hash of options to configure the encryption. *:key* is required, all other options are optional. See {Ably::Util::Crypto#initialize} for a list of `cipher_params` options # def initialize(client, name, channel_options = {}) + ensure_utf_8 :name, name + @client = client @name = name @options = channel_options.clone.freeze @subscriptions = Hash.new { |hash, key| hash[key] = [] } @queue = [] - @state = STATE.Initialized + @state_machine = ChannelStateMachine.new(self) + @state = STATE(state_machine.current_state) + @manager = ChannelManager.new(self, client.connection) + setup_event_handlers end # Publish a message on the channel # @@ -89,13 +110,15 @@ # # channel.publish('click', 'body').errback do |message, error| # puts "#{message.name} was not received, error #{error.message}" # end # - def publish(name, data, &callback) + def publish(name, data, &success_block) + ensure_utf_8 :name, name + create_message(name, data).tap do |message| - message.callback(&callback) if block_given? + message.callback(&success_block) if block_given? queue_message message end end # Subscribe to messages matching providing event name, or all messages if event name not provided @@ -103,77 +126,65 @@ # @param name [String] The event name of the message to subscribe to if provided. Defaults to all events. # @yield [Ably::Models::Message] For each message received, the block is called # # @return [void] # - def subscribe(name = :all, &blk) + def subscribe(name = :all, &callback) attach unless attached? || attaching? - subscriptions[message_name_key(name)] << blk + subscriptions[message_name_key(name)] << callback end # Unsubscribe the matching block for messages matching providing event name, or all messages if event name not provided. # If a block is not provided, all subscriptions will be unsubscribed # # @param name [String] The event name of the message to subscribe to if provided. Defaults to all events. # # @return [void] # - def unsubscribe(name = :all, &blk) + def unsubscribe(name = :all, &callback) if message_name_key(name) == :all subscriptions.keys else Array(message_name_key(name)) end.each do |key| subscriptions[key].delete_if do |block| - !block_given? || blk == block + !block_given? || callback == block end end end # Attach to this channel, and call the block if provided when attached. # Attaching to a channel is implicit in when a message is published or #subscribe is called, so it is uncommon # to need to call attach explicitly. # # @yield [Ably::Realtime::Channel] Block is called as soon as this channel is in the Attached state - # @return [void] + # @return [EventMachine::Deferrable] Deferrable that supports both success (callback) and failure (errback) callback # - def attach(&block) - if attached? - block.call self if block_given? - else - once(STATE.Attached) { block.call self } if block_given? - if !attaching? - change_state STATE.Attaching - send_attach_protocol_message - end - end + def attach(&success_block) + transition_state_machine :attaching if can_transition_to?(:attaching) + deferrable_for_state_change_to(STATE.Attached, &success_block) end # Detach this channel, and call the block if provided when in a Detached or Failed state # # @yield [Ably::Realtime::Channel] Block is called as soon as this channel is in the Detached or Failed state # @return [void] # - def detach(&block) - if detached? || failed? - block.call self if block_given? - else - once(STATE.Detached, STATE.Failed) { block.call self } if block_given? - if !detaching? - change_state STATE.Detaching - send_detach_protocol_message - end - end + def detach(&success_block) + raise exception_for_state_change_to(:detaching) if failed? || initialized? + transition_state_machine :detaching if can_transition_to?(:detaching) + deferrable_for_state_change_to(STATE.Detached, &success_block) end # Presence object for this Channel. This controls this client's # presence on the channel and may also be used to obtain presence information # and change events for other members of the channel. # # @return {Ably::Realtime::Presence} # def presence + attach if initialized? @presence ||= Presence.new(self) end # Return the message history of the channel # @@ -196,10 +207,15 @@ @__incoming_msgbus__ ||= Ably::Util::PubSub.new( coerce_into: Proc.new { |event| Ably::Models::ProtocolMessage::ACTION(event) } ) end + # @api private + def set_failed_channel_error_reason(error) + @error_reason = error + end + private attr_reader :queue, :subscriptions def setup_event_handlers __incoming_msgbus__.subscribe(:message) do |message| @@ -210,18 +226,10 @@ end on(STATE.Attached) do process_queue end - - connection.on(Connection::STATE.Closed) do - change_state STATE.Detached - end - - connection.on(Connection::STATE.Failed) do - change_state STATE.Failed unless detached? || initialized? - end end # Queue message and process queue if channel is attached. # If channel is not yet attached, attempt to attach it before the message queue is processed. def queue_message(message) @@ -249,24 +257,9 @@ def send_messages_within_protocol_message(messages) client.connection.send_protocol_message( action: Ably::Models::ProtocolMessage::ACTION.Message.to_i, channel: name, messages: messages - ) - end - - def send_attach_protocol_message - send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Attach - end - - def send_detach_protocol_message - send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Detach - end - - def send_state_change_protocol_message(state) - client.connection.send_protocol_message( - action: state.to_i, - channel: name ) end def create_message(name, data) message = { name: name }