lib/ably/realtime/channel.rb in ably-0.8.2 vs lib/ably/realtime/channel.rb in ably-0.8.3

- old
+ new

@@ -85,51 +85,78 @@ # @option channel_options [Hash] :cipher_params A hash of options to configure the encryption. *:key* is required, all other options are optional. See {Ably::Util::Crypto#initialize} for a list of +cipher_params+ options # def initialize(client, name, channel_options = {}) ensure_utf_8 :name, name + update_options channel_options @client = client @name = name - @options = channel_options.clone.freeze @queue = [] @state_machine = ChannelStateMachine.new(self) @state = STATE(state_machine.current_state) @manager = ChannelManager.new(self, client.connection) setup_event_handlers setup_presence end - # Publish a message on the channel. + # Publish one or more messages to the channel. # # When publishing a message, if the channel is not attached, the channel is implicitly attached # - # @param name [String] The event name of the message - # @param data [String,ByteArray] payload for the message - # @yield [Ably::Models::Message] On success, will call the block with the {Ably::Models::Message} - # @return [Ably::Models::Message] Deferrable {Ably::Models::Message} that supports both success (callback) and failure (errback) callbacks + # @param name [String, Array<Ably::Models::Message|Hash>, nil] The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with +:name+ and +:data+ pairs + # @param data [String, ByteArray, nil] The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument # + # @yield [Ably::Models::Message,Array<Ably::Models::Message>] On success, will call the block with the {Ably::Models::Message} if a single message is publishde, or an Array of {Ably::Models::Message} when multiple messages are published + # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks + # # @example - # channel.publish('click', 'body') + # # Publish a single message + # channel.publish 'click', { x: 1, y: 2 } # + # # Publish an array of message Hashes + # messages = [ + # { name: 'click', { x: 1, y: 2 } }, + # { name: 'click', { x: 2, y: 3 } } + # ] + # channel.publish messages + # + # # Publish an array of Ably::Models::Message objects + # messages = [ + # Ably::Models::Message(name: 'click', { x: 1, y: 2 }) + # Ably::Models::Message(name: 'click', { x: 2, y: 3 }) + # ] + # channel.publish messages + # # channel.publish('click', 'body') do |message| # puts "#{message.name} event received with #{message.data}" # end # # channel.publish('click', 'body').errback do |message, error| # puts "#{message.name} was not received, error #{error.message}" # end # - def publish(name, data, &success_block) - ensure_utf_8 :name, name - ensure_supported_payload data + def publish(name, data = nil, &success_block) + raise Ably::Exceptions::ChannelInactive.new('Cannot publish messages on a detached channel') if detached? || detaching? + raise Ably::Exceptions::ChannelInactive.new('Cannot publish messages on a failed channel') if failed? - create_message(name, data).tap do |message| - message.callback(&success_block) if block_given? - queue_message message + if !connection.can_publish_messages? + raise Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is configured to disallow queueing of messages and connection is currently #{connection.state}") end + + messages = if name.kind_of?(Enumerable) + name + else + ensure_utf_8 :name, name, allow_nil: true + ensure_supported_payload data + [{ name: name, data: data }] + end + + queue_messages(messages).tap do |deferrable| + deferrable.callback &success_block if block_given? + end end # Subscribe to messages matching providing event name, or all messages if event name not provided. # # When subscribing to messages, if the channel is not attached, the channel is implicitly attached @@ -161,21 +188,31 @@ # # @yield [Ably::Realtime::Channel] Block is called as soon as this channel is in the Attached state # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callback # def attach(&success_block) + raise Ably::Exceptions::InvalidStateChange.new("Cannot ATTACH channel when the connection is in a closed, suspended or failed state. Connection state: #{connection.state}") if connection.closing? || connection.closed? || connection.suspended? || connection.failed? + transition_state_machine :attaching if can_transition_to?(:attaching) deferrable_for_state_change_to(STATE.Attached, &success_block) end # Detach this channel, and call the block if provided when in a Detached or Failed state # # @yield [Ably::Realtime::Channel] Block is called as soon as this channel is in the Detached or Failed state - # @return [void] + # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callback # def detach(&success_block) - raise exception_for_state_change_to(:detaching) if failed? || initialized? + if initialized? + success_block.call if block_given? + return Ably::Util::SafeDeferrable.new(logger).tap do |deferrable| + EventMachine.next_tick { deferrable.succeed } + end + end + + raise exception_for_state_change_to(:detaching) if failed? + transition_state_machine :detaching if can_transition_to?(:detaching) deferrable_for_state_change_to(STATE.Detached, &success_block) end # Presence object for this Channel. This controls this client's @@ -235,10 +272,15 @@ # @api private def set_attached_serial(serial) @attached_serial = serial end + # @api private + def update_options(channel_options) + @options = channel_options.clone.freeze + end + # Used by {Ably::Modules::StateEmitter} to debug state changes # @api private def logger client.logger end @@ -255,22 +297,56 @@ on(STATE.Attached) do process_queue end end - # Queue message and process queue if channel is attached. + # Queue messages and process queue if channel is attached. # If channel is not yet attached, attempt to attach it before the message queue is processed. - def queue_message(message) - queue << message + # @returns [Ably::Util::SafeDeferrable] + def queue_messages(raw_messages) + messages = Array(raw_messages).map { |msg| create_message(msg) } + queue.push *messages if attached? process_queue else attach end + + if messages.count == 1 + # A message is a Deferrable so, if publishing only one message, simply return that Deferrable + messages.first + else + deferrable_for_multiple_messages(messages) + end end + # A deferrable object that calls the success callback once all messages are delivered + # If any message fails, the errback is called immediately + # Only one callback or errback is ever called i.e. if a group of messages all fail, only once + # errback will be invoked + def deferrable_for_multiple_messages(messages) + expected_deliveries = messages.count + actual_deliveries = 0 + failed = false + + Ably::Util::SafeDeferrable.new(logger).tap do |deferrable| + messages.each do |message| + message.callback do + return if failed + actual_deliveries += 1 + deferrable.succeed messages if actual_deliveries == expected_deliveries + end + message.errback do |error| + return if failed + failed = true + deferrable.fail error, message + end + end + end + end + def messages_in_queue? !queue.empty? end # Move messages from Channel Queue into Outgoing Connection Queue @@ -280,22 +356,18 @@ send_messages_within_protocol_message queue.shift(MAX_PROTOCOL_MESSAGE_BATCH_SIZE) end end def send_messages_within_protocol_message(messages) - client.connection.send_protocol_message( + connection.send_protocol_message( action: Ably::Models::ProtocolMessage::ACTION.Message.to_i, channel: name, messages: messages ) end - def create_message(name, data) - message = { name: name } - message.merge!(data: data) unless data.nil? - message.merge!(clientId: client.client_id) if client.client_id - - Ably::Models::Message.new(message, logger: logger).tap do |message| + def create_message(message) + Ably::Models::Message(message.dup).tap do |message| message.encode self end end def rest_channel