module Ably module Realtime # The Channel class represents a Channel belonging to this application. # The Channel instance allows messages to be published and # received, and controls the lifecycle of this instance's # attachment to the channel. # # Channels will always be in one of the following states: # # initialized: 0 # attaching: 1 # attached: 2 # detaching: 3 # detached: 4 # failed: 5 # # Note that the states are available as Enum-like constants: # # Channel::STATE.Initialized # Channel::STATE.Attaching # Channel::STATE.Attached # Channel::STATE.Detaching # Channel::STATE.Detached # Channel::STATE.Failed # # @!attribute [r] state # @return {Ably::Realtime::Connection::STATE} channel state # class Channel include Ably::Modules::Conversions include Ably::Modules::EventEmitter include Ably::Modules::EventMachineHelpers include Ably::Modules::AsyncWrapper include Ably::Modules::MessageEmitter extend Ably::Modules::Enum # ChannelState # The permited states for this channel STATE = ruby_enum('STATE', :initialized, :attaching, :attached, :detaching, :detached, :suspended, :failed ) # ChannelEvent # The permitted channel events that are emitted for this channel EVENT = ruby_enum('EVENT', STATE.to_sym_arr + [:update] ) include Ably::Modules::StateEmitter include Ably::Modules::UsesStateMachine ensure_state_machine_emits 'Ably::Models::ChannelStateChange' # Max number of messages to bundle in a single ProtocolMessage MAX_PROTOCOL_MESSAGE_BATCH_SIZE = 50 # {Ably::Realtime::Client} associated with this channel # @return [Ably::Realtime::Client] attr_reader :client # Channel name # @return [String] attr_reader :name # Channel options configured for this channel, see {#initialize} for channel_options # @return [Hash] attr_reader :options # When a channel failure occurs this attribute contains the Ably Exception # @return [Ably::Models::ErrorInfo,Ably::Exceptions::BaseAblyException] attr_reader :error_reason # The Channel manager responsible for attaching, detaching and handling failures for this channel # @return [Ably::Realtime::Channel::ChannelManager] # @api private attr_reader :manager # Serial number assigned to this channel when it was attached # @return [Integer] # @api private attr_reader :attached_serial # Initialize a new Channel object # # @param client [Ably::Rest::Client] # @param name [String] The name of the channel # @param channel_options [Hash] Channel options, currently reserved for Encryption options # @option channel_options [Hash,Ably::Models::CipherParams] :cipher A hash of options or a {Ably::Models::CipherParams} to configure the encryption. *:key* is required, all other options are optional. See {Ably::Util::Crypto#initialize} for a list of +:cipher+ options # def initialize(client, name, channel_options = {}) ensure_utf_8 :name, name update_options channel_options @client = client @name = name @queue = [] @state_machine = ChannelStateMachine.new(self) @state = STATE(state_machine.current_state) @manager = ChannelManager.new(self, client.connection) setup_event_handlers setup_presence end # Publish one or more messages to the channel. # # When publishing a message, if the channel is not attached, the channel is implicitly attached # # @param name [String, Array, nil] The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with +:name+ and +:data+ pairs # @param data [String, ByteArray, nil] The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument # @param attributes [Hash, nil] Optional additional message attributes such as :client_id or :connection_id, applied when name attribute is nil or a string # # @yield [Ably::Models::Message,Array] On success, will call the block with the {Ably::Models::Message} if a single message is publishde, or an Array of {Ably::Models::Message} when multiple messages are published # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks # # @example # # Publish a single message # channel.publish 'click', { x: 1, y: 2 } # # # Publish an array of message Hashes # messages = [ # { name: 'click', { x: 1, y: 2 } }, # { name: 'click', { x: 2, y: 3 } } # ] # channel.publish messages # # # Publish an array of Ably::Models::Message objects # messages = [ # Ably::Models::Message(name: 'click', { x: 1, y: 2 }) # Ably::Models::Message(name: 'click', { x: 2, y: 3 }) # ] # channel.publish messages # # channel.publish('click', 'body') do |message| # puts "#{message.name} event received with #{message.data}" # end # # channel.publish('click', 'body').errback do |error, message| # puts "#{message.name} was not received, error #{error.message}" # end # def publish(name, data = nil, attributes = {}, &success_block) if detached? || detaching? || failed? error = Ably::Exceptions::ChannelInactive.new("Cannot publish messages on a channel in state #{state}") return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) end if !connection.can_publish_messages? error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is configured to disallow queueing of messages and connection is currently #{connection.state}") return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) end messages = if name.kind_of?(Enumerable) name else ensure_utf_8 :name, name, allow_nil: true ensure_supported_payload data [{ name: name, data: data }.merge(attributes)] end queue_messages(messages).tap do |deferrable| deferrable.callback(&success_block) if block_given? end end # Subscribe to messages matching providing event name, or all messages if event name not provided. # # When subscribing to messages, if the channel is not attached, the channel is implicitly attached # # @param names [String] The event name of the message to subscribe to if provided. Defaults to all events. # @yield [Ably::Models::Message] For each message received, the block is called # # @return [void] # def subscribe(*names, &callback) attach unless attached? || attaching? super end # Unsubscribe the matching block for messages matching providing event name, or all messages if event name not provided. # If a block is not provided, all subscriptions will be unsubscribed # # @param names [String] The event name of the message to subscribe to if provided. Defaults to all events. # # @return [void] # def unsubscribe(*names, &callback) super end # Attach to this channel, and call the block if provided when attached. # Attaching to a channel is implicit in when a message is published or #subscribe is called, so it is uncommon # to need to call attach explicitly. # # @yield [Ably::Realtime::Channel] Block is called as soon as this channel is in the Attached state # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callback # def attach(&success_block) if connection.closing? || connection.closed? || connection.suspended? || connection.failed? error = Ably::Exceptions::InvalidStateChange.new("Cannot ATTACH channel when the connection is in a closed, suspended or failed state. Connection state: #{connection.state}") return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) end if !attached? if detaching? # Let the pending operation complete (#RTL4h) once_state_changed { transition_state_machine :attaching if can_transition_to?(:attaching) } else transition_state_machine :attaching if can_transition_to?(:attaching) end end deferrable_for_state_change_to(STATE.Attached, &success_block) end # Detach this channel, and call the block if provided when in a Detached or Failed state # # @yield [Ably::Realtime::Channel] Block is called as soon as this channel is in the Detached or Failed state # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callback # def detach(&success_block) if initialized? success_block.call if block_given? return Ably::Util::SafeDeferrable.new_and_succeed_immediately(logger) end if failed? || connection.closing? || connection.failed? return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, exception_for_state_change_to(:detaching)) end if !detached? if attaching? # Let the pending operation complete (#RTL5i) once_state_changed { transition_state_machine :detaching if can_transition_to?(:detaching) } elsif can_transition_to?(:detaching) transition_state_machine :detaching else transition_state_machine! :detached end end deferrable_for_state_change_to(STATE.Detached, &success_block) end # Presence object for this Channel. This controls this client's # presence on the channel and may also be used to obtain presence information # and change events for other members of the channel. # # @return {Ably::Realtime::Presence} # def presence @presence end # Return the message history of the channel # # If the channel is attached, you can retrieve messages published on the channel before the # channel was attached with the option until_attach: true. This is useful when a developer # wishes to display historical messages with the guarantee that no messages have been missed since attach. # # @param (see Ably::Rest::Channel#history) # @option options (see Ably::Rest::Channel#history) # @option options [Boolean] :until_attach When true, the history request will be limited only to messages published before this channel was attached. Channel must be attached # # @yield [Ably::Models::PaginatedResult] First {Ably::Models::PaginatedResult page} of {Ably::Models::Message} objects accessible with {Ably::Models::PaginatedResult#items #items}. # # @return [Ably::Util::SafeDeferrable] # def history(options = {}, &callback) if options.delete(:until_attach) unless attached? error = Ably::Exceptions::InvalidRequest.new('option :until_attach is invalid as the channel is not attached' ) return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error) end options[:from_serial] = attached_serial end async_wrap(callback) do rest_channel.history(options.merge(async_blocking_operations: true)) end end # @!attribute [r] __incoming_msgbus__ # @return [Ably::Util::PubSub] Client library internal channel incoming message bus # @api private def __incoming_msgbus__ @__incoming_msgbus__ ||= Ably::Util::PubSub.new( coerce_into: Proc.new { |event| Ably::Models::ProtocolMessage::ACTION(event) } ) end # @api private def set_channel_error_reason(error) @error_reason = error end # @api private def clear_error_reason @error_reason = nil end # @api private def set_attached_serial(serial) @attached_serial = serial end # @api private def update_options(channel_options) @options = channel_options.clone.freeze end # Used by {Ably::Modules::StateEmitter} to debug state changes # @api private def logger client.logger end # Internal queue used for messages published that cannot yet be enqueued on the connection # @api private def __queue__ @queue end # As we are using a state machine, do not allow change_state to be used # #transition_state_machine must be used instead private :change_state private def setup_event_handlers __incoming_msgbus__.subscribe(:message) do |message| message.decode(client.encoders, options) do |encode_error, error_message| client.logger.error error_message end emit_message message.name, message end unsafe_on(STATE.Attached) do process_queue end end # Queue messages and process queue if channel is attached. # If channel is not yet attached, attempt to attach it before the message queue is processed. # @return [Ably::Util::SafeDeferrable] def queue_messages(raw_messages) messages = Array(raw_messages).map do |raw_msg| create_message(raw_msg).tap do |message| next if message.client_id.nil? if message.client_id == '*' raise Ably::Exceptions::IncompatibleClientId.new('Wildcard client_id is reserved and cannot be used when publishing messages') end if message.client_id && !message.client_id.kind_of?(String) raise Ably::Exceptions::IncompatibleClientId.new('client_id must be a String when publishing messages') end unless client.auth.can_assume_client_id?(message.client_id) raise Ably::Exceptions::IncompatibleClientId.new("Cannot publish with client_id '#{message.client_id}' as it is incompatible with the current configured client_id '#{client.client_id}'") end end end __queue__.push(*messages) if attached? process_queue else attach end if messages.count == 1 # A message is a Deferrable so, if publishing only one message, simply return that Deferrable messages.first else deferrable_for_multiple_messages(messages) end end # A deferrable object that calls the success callback once all messages are delivered # If any message fails, the errback is called immediately # Only one callback or errback is ever called i.e. if a group of messages all fail, only once # errback will be invoked def deferrable_for_multiple_messages(messages) expected_deliveries = messages.count actual_deliveries = 0 failed = false Ably::Util::SafeDeferrable.new(logger).tap do |deferrable| messages.each do |message| message.callback do next if failed actual_deliveries += 1 deferrable.succeed messages if actual_deliveries == expected_deliveries end message.errback do |error| next if failed failed = true deferrable.fail error, message end end end end def messages_in_queue? !__queue__.empty? end # Move messages from Channel Queue into Outgoing Connection Queue def process_queue condition = -> { attached? && messages_in_queue? } non_blocking_loop_while(condition) do send_messages_within_protocol_message __queue__.shift(MAX_PROTOCOL_MESSAGE_BATCH_SIZE) end end def send_messages_within_protocol_message(messages) connection.send_protocol_message( action: Ably::Models::ProtocolMessage::ACTION.Message.to_i, channel: name, messages: messages ) end def create_message(message) Ably::Models::Message(message.dup).tap do |msg| msg.encode(client.encoders, options) do |encode_error, error_message| client.logger.error error_message end end end def rest_channel client.rest_client.channel(name) end def connection client.connection end def setup_presence @presence ||= Presence.new(self) end end end end require 'ably/realtime/channel/channel_manager' require 'ably/realtime/channel/channel_state_machine'