lib/ably/realtime/channel.rb in ably-0.6.2 vs lib/ably/realtime/channel.rb in ably-0.7.0
- old
+ new
@@ -21,18 +21,14 @@
# Channel::STATE.Attached
# Channel::STATE.Detaching
# Channel::STATE.Detached
# Channel::STATE.Failed
#
+ # Channels emit errors - use `on(:error)` to subscribe to errors
+ #
# @!attribute [r] state
# @return {Ably::Realtime::Connection::STATE} channel state
- # @!attribute [r] client
- # @return {Ably::Realtime::Client} Ably client associated with this channel
- # @!attribute [r] name
- # @return {String} channel name
- # @!attribute [r] options
- # @return {Hash} channel options configured for this channel, see {#initialize} for channel_options
#
class Channel
include Ably::Modules::Conversions
include Ably::Modules::EventEmitter
include Ably::Modules::EventMachineHelpers
@@ -46,32 +42,57 @@
:detaching,
:detached,
:failed
)
include Ably::Modules::StateEmitter
+ include Ably::Modules::UsesStateMachine
# Max number of messages to bundle in a single ProtocolMessage
MAX_PROTOCOL_MESSAGE_BATCH_SIZE = 50
- attr_reader :client, :name, :options
+ # {Ably::Realtime::Client} associated with this channel
+ # @return [Ably::Realtime::Client]
+ attr_reader :client
+ # Channel name
+ # @return [String]
+ attr_reader :name
+
+ # Channel options configured for this channel, see {#initialize} for channel_options
+ # @return [Hash]
+ attr_reader :options
+
+ # When a channel failure occurs this attribute contains the Ably Exception
+ # @return [Ably::Models::ErrorInfo,Ably::Exceptions::BaseAblyException]
+ attr_reader :error_reason
+
+ # The Channel manager responsible for attaching, detaching and handling failures for this channel
+ # @return [Ably::Realtime::Channel::ChannelManager]
+ # @api private
+ attr_reader :manager
+
# Initialize a new Channel object
#
# @param client [Ably::Rest::Client]
# @param name [String] The name of the channel
# @param channel_options [Hash] Channel options, currently reserved for Encryption options
# @option channel_options [Boolean] :encrypted setting this to true for this channel will encrypt & decrypt all messages automatically
# @option channel_options [Hash] :cipher_params A hash of options to configure the encryption. *:key* is required, all other options are optional. See {Ably::Util::Crypto#initialize} for a list of `cipher_params` options
#
def initialize(client, name, channel_options = {})
+ ensure_utf_8 :name, name
+
@client = client
@name = name
@options = channel_options.clone.freeze
@subscriptions = Hash.new { |hash, key| hash[key] = [] }
@queue = []
- @state = STATE.Initialized
+ @state_machine = ChannelStateMachine.new(self)
+ @state = STATE(state_machine.current_state)
+ @manager = ChannelManager.new(self, client.connection)
+
setup_event_handlers
end
# Publish a message on the channel
#
@@ -89,13 +110,15 @@
#
# channel.publish('click', 'body').errback do |message, error|
# puts "#{message.name} was not received, error #{error.message}"
# end
#
- def publish(name, data, &callback)
+ def publish(name, data, &success_block)
+ ensure_utf_8 :name, name
+
create_message(name, data).tap do |message|
- message.callback(&callback) if block_given?
+ message.callback(&success_block) if block_given?
queue_message message
end
end
# Subscribe to messages matching providing event name, or all messages if event name not provided
@@ -103,77 +126,65 @@
# @param name [String] The event name of the message to subscribe to if provided. Defaults to all events.
# @yield [Ably::Models::Message] For each message received, the block is called
#
# @return [void]
#
- def subscribe(name = :all, &blk)
+ def subscribe(name = :all, &callback)
attach unless attached? || attaching?
- subscriptions[message_name_key(name)] << blk
+ subscriptions[message_name_key(name)] << callback
end
# Unsubscribe the matching block for messages matching providing event name, or all messages if event name not provided.
# If a block is not provided, all subscriptions will be unsubscribed
#
# @param name [String] The event name of the message to subscribe to if provided. Defaults to all events.
#
# @return [void]
#
- def unsubscribe(name = :all, &blk)
+ def unsubscribe(name = :all, &callback)
if message_name_key(name) == :all
subscriptions.keys
else
Array(message_name_key(name))
end.each do |key|
subscriptions[key].delete_if do |block|
- !block_given? || blk == block
+ !block_given? || callback == block
end
end
end
# Attach to this channel, and call the block if provided when attached.
# Attaching to a channel is implicit in when a message is published or #subscribe is called, so it is uncommon
# to need to call attach explicitly.
#
# @yield [Ably::Realtime::Channel] Block is called as soon as this channel is in the Attached state
- # @return [void]
+ # @return [EventMachine::Deferrable] Deferrable that supports both success (callback) and failure (errback) callback
#
- def attach(&block)
- if attached?
- block.call self if block_given?
- else
- once(STATE.Attached) { block.call self } if block_given?
- if !attaching?
- change_state STATE.Attaching
- send_attach_protocol_message
- end
- end
+ def attach(&success_block)
+ transition_state_machine :attaching if can_transition_to?(:attaching)
+ deferrable_for_state_change_to(STATE.Attached, &success_block)
end
# Detach this channel, and call the block if provided when in a Detached or Failed state
#
# @yield [Ably::Realtime::Channel] Block is called as soon as this channel is in the Detached or Failed state
# @return [void]
#
- def detach(&block)
- if detached? || failed?
- block.call self if block_given?
- else
- once(STATE.Detached, STATE.Failed) { block.call self } if block_given?
- if !detaching?
- change_state STATE.Detaching
- send_detach_protocol_message
- end
- end
+ def detach(&success_block)
+ raise exception_for_state_change_to(:detaching) if failed? || initialized?
+ transition_state_machine :detaching if can_transition_to?(:detaching)
+ deferrable_for_state_change_to(STATE.Detached, &success_block)
end
# Presence object for this Channel. This controls this client's
# presence on the channel and may also be used to obtain presence information
# and change events for other members of the channel.
#
# @return {Ably::Realtime::Presence}
#
def presence
+ attach if initialized?
@presence ||= Presence.new(self)
end
# Return the message history of the channel
#
@@ -196,10 +207,15 @@
@__incoming_msgbus__ ||= Ably::Util::PubSub.new(
coerce_into: Proc.new { |event| Ably::Models::ProtocolMessage::ACTION(event) }
)
end
+ # @api private
+ def set_failed_channel_error_reason(error)
+ @error_reason = error
+ end
+
private
attr_reader :queue, :subscriptions
def setup_event_handlers
__incoming_msgbus__.subscribe(:message) do |message|
@@ -210,18 +226,10 @@
end
on(STATE.Attached) do
process_queue
end
-
- connection.on(Connection::STATE.Closed) do
- change_state STATE.Detached
- end
-
- connection.on(Connection::STATE.Failed) do
- change_state STATE.Failed unless detached? || initialized?
- end
end
# Queue message and process queue if channel is attached.
# If channel is not yet attached, attempt to attach it before the message queue is processed.
def queue_message(message)
@@ -249,24 +257,9 @@
def send_messages_within_protocol_message(messages)
client.connection.send_protocol_message(
action: Ably::Models::ProtocolMessage::ACTION.Message.to_i,
channel: name,
messages: messages
- )
- end
-
- def send_attach_protocol_message
- send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Attach
- end
-
- def send_detach_protocol_message
- send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Detach
- end
-
- def send_state_change_protocol_message(state)
- client.connection.send_protocol_message(
- action: state.to_i,
- channel: name
)
end
def create_message(name, data)
message = { name: name }