lib/ably/realtime/channel.rb in ably-0.8.2 vs lib/ably/realtime/channel.rb in ably-0.8.3
- old
+ new
@@ -85,51 +85,78 @@
# @option channel_options [Hash] :cipher_params A hash of options to configure the encryption. *:key* is required, all other options are optional. See {Ably::Util::Crypto#initialize} for a list of +cipher_params+ options
#
def initialize(client, name, channel_options = {})
ensure_utf_8 :name, name
+ update_options channel_options
@client = client
@name = name
- @options = channel_options.clone.freeze
@queue = []
@state_machine = ChannelStateMachine.new(self)
@state = STATE(state_machine.current_state)
@manager = ChannelManager.new(self, client.connection)
setup_event_handlers
setup_presence
end
- # Publish a message on the channel.
+ # Publish one or more messages to the channel.
#
# When publishing a message, if the channel is not attached, the channel is implicitly attached
#
- # @param name [String] The event name of the message
- # @param data [String,ByteArray] payload for the message
- # @yield [Ably::Models::Message] On success, will call the block with the {Ably::Models::Message}
- # @return [Ably::Models::Message] Deferrable {Ably::Models::Message} that supports both success (callback) and failure (errback) callbacks
+ # @param name [String, Array<Ably::Models::Message|Hash>, nil] The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with +:name+ and +:data+ pairs
+ # @param data [String, ByteArray, nil] The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument
#
+ # @yield [Ably::Models::Message,Array<Ably::Models::Message>] On success, will call the block with the {Ably::Models::Message} if a single message is publishde, or an Array of {Ably::Models::Message} when multiple messages are published
+ # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks
+ #
# @example
- # channel.publish('click', 'body')
+ # # Publish a single message
+ # channel.publish 'click', { x: 1, y: 2 }
#
+ # # Publish an array of message Hashes
+ # messages = [
+ # { name: 'click', { x: 1, y: 2 } },
+ # { name: 'click', { x: 2, y: 3 } }
+ # ]
+ # channel.publish messages
+ #
+ # # Publish an array of Ably::Models::Message objects
+ # messages = [
+ # Ably::Models::Message(name: 'click', { x: 1, y: 2 })
+ # Ably::Models::Message(name: 'click', { x: 2, y: 3 })
+ # ]
+ # channel.publish messages
+ #
# channel.publish('click', 'body') do |message|
# puts "#{message.name} event received with #{message.data}"
# end
#
# channel.publish('click', 'body').errback do |message, error|
# puts "#{message.name} was not received, error #{error.message}"
# end
#
- def publish(name, data, &success_block)
- ensure_utf_8 :name, name
- ensure_supported_payload data
+ def publish(name, data = nil, &success_block)
+ raise Ably::Exceptions::ChannelInactive.new('Cannot publish messages on a detached channel') if detached? || detaching?
+ raise Ably::Exceptions::ChannelInactive.new('Cannot publish messages on a failed channel') if failed?
- create_message(name, data).tap do |message|
- message.callback(&success_block) if block_given?
- queue_message message
+ if !connection.can_publish_messages?
+ raise Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is configured to disallow queueing of messages and connection is currently #{connection.state}")
end
+
+ messages = if name.kind_of?(Enumerable)
+ name
+ else
+ ensure_utf_8 :name, name, allow_nil: true
+ ensure_supported_payload data
+ [{ name: name, data: data }]
+ end
+
+ queue_messages(messages).tap do |deferrable|
+ deferrable.callback &success_block if block_given?
+ end
end
# Subscribe to messages matching providing event name, or all messages if event name not provided.
#
# When subscribing to messages, if the channel is not attached, the channel is implicitly attached
@@ -161,21 +188,31 @@
#
# @yield [Ably::Realtime::Channel] Block is called as soon as this channel is in the Attached state
# @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callback
#
def attach(&success_block)
+ raise Ably::Exceptions::InvalidStateChange.new("Cannot ATTACH channel when the connection is in a closed, suspended or failed state. Connection state: #{connection.state}") if connection.closing? || connection.closed? || connection.suspended? || connection.failed?
+
transition_state_machine :attaching if can_transition_to?(:attaching)
deferrable_for_state_change_to(STATE.Attached, &success_block)
end
# Detach this channel, and call the block if provided when in a Detached or Failed state
#
# @yield [Ably::Realtime::Channel] Block is called as soon as this channel is in the Detached or Failed state
- # @return [void]
+ # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callback
#
def detach(&success_block)
- raise exception_for_state_change_to(:detaching) if failed? || initialized?
+ if initialized?
+ success_block.call if block_given?
+ return Ably::Util::SafeDeferrable.new(logger).tap do |deferrable|
+ EventMachine.next_tick { deferrable.succeed }
+ end
+ end
+
+ raise exception_for_state_change_to(:detaching) if failed?
+
transition_state_machine :detaching if can_transition_to?(:detaching)
deferrable_for_state_change_to(STATE.Detached, &success_block)
end
# Presence object for this Channel. This controls this client's
@@ -235,10 +272,15 @@
# @api private
def set_attached_serial(serial)
@attached_serial = serial
end
+ # @api private
+ def update_options(channel_options)
+ @options = channel_options.clone.freeze
+ end
+
# Used by {Ably::Modules::StateEmitter} to debug state changes
# @api private
def logger
client.logger
end
@@ -255,22 +297,56 @@
on(STATE.Attached) do
process_queue
end
end
- # Queue message and process queue if channel is attached.
+ # Queue messages and process queue if channel is attached.
# If channel is not yet attached, attempt to attach it before the message queue is processed.
- def queue_message(message)
- queue << message
+ # @returns [Ably::Util::SafeDeferrable]
+ def queue_messages(raw_messages)
+ messages = Array(raw_messages).map { |msg| create_message(msg) }
+ queue.push *messages
if attached?
process_queue
else
attach
end
+
+ if messages.count == 1
+ # A message is a Deferrable so, if publishing only one message, simply return that Deferrable
+ messages.first
+ else
+ deferrable_for_multiple_messages(messages)
+ end
end
+ # A deferrable object that calls the success callback once all messages are delivered
+ # If any message fails, the errback is called immediately
+ # Only one callback or errback is ever called i.e. if a group of messages all fail, only once
+ # errback will be invoked
+ def deferrable_for_multiple_messages(messages)
+ expected_deliveries = messages.count
+ actual_deliveries = 0
+ failed = false
+
+ Ably::Util::SafeDeferrable.new(logger).tap do |deferrable|
+ messages.each do |message|
+ message.callback do
+ return if failed
+ actual_deliveries += 1
+ deferrable.succeed messages if actual_deliveries == expected_deliveries
+ end
+ message.errback do |error|
+ return if failed
+ failed = true
+ deferrable.fail error, message
+ end
+ end
+ end
+ end
+
def messages_in_queue?
!queue.empty?
end
# Move messages from Channel Queue into Outgoing Connection Queue
@@ -280,22 +356,18 @@
send_messages_within_protocol_message queue.shift(MAX_PROTOCOL_MESSAGE_BATCH_SIZE)
end
end
def send_messages_within_protocol_message(messages)
- client.connection.send_protocol_message(
+ connection.send_protocol_message(
action: Ably::Models::ProtocolMessage::ACTION.Message.to_i,
channel: name,
messages: messages
)
end
- def create_message(name, data)
- message = { name: name }
- message.merge!(data: data) unless data.nil?
- message.merge!(clientId: client.client_id) if client.client_id
-
- Ably::Models::Message.new(message, logger: logger).tap do |message|
+ def create_message(message)
+ Ably::Models::Message(message.dup).tap do |message|
message.encode self
end
end
def rest_channel