lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb in ably-rest-0.9.3 vs lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb in ably-rest-1.0.0
- old
+ new
@@ -10,12 +10,10 @@
extend Forwardable
def initialize(channel, connection)
@channel = channel
@connection = connection
-
- setup_connection_event_handlers
end
# Commence attachment
def attach
if can_transition_to?(:attached)
@@ -23,145 +21,236 @@
send_attach_protocol_message
end
end
# Commence attachment
- def detach(error = nil)
+ def detach(error, previous_state)
if connection.closed? || connection.connecting? || connection.suspended?
channel.transition_state_machine :detached, reason: error
elsif can_transition_to?(:detached)
- send_detach_protocol_message
+ send_detach_protocol_message previous_state
end
end
# Channel is attached, notify presence if sync is expected
def attached(attached_protocol_message)
- if attached_protocol_message.has_presence_flag?
- channel.presence.manager.sync_expected
- else
- channel.presence.manager.sync_not_expected
+ # If no attached ProtocolMessage then this attached request was triggered by the client
+ # library, such as returning to attached whne detach has failed
+ if attached_protocol_message
+ update_presence_sync_state_following_attached attached_protocol_message
+ channel.set_attached_serial attached_protocol_message.channel_serial
end
- channel.set_attached_serial attached_protocol_message.channel_serial
end
# An error has occurred on the channel
- def emit_error(error)
- logger.error "ChannelManager: Channel '#{channel.name}' error: #{error}"
- channel.emit :error, error
+ def log_channel_error(error)
+ logger.error { "ChannelManager: Channel '#{channel.name}' error: #{error}" }
end
- # Detach a channel as a result of an error
- def suspend(error)
- channel.transition_state_machine! :detaching, reason: error
+ # Request channel to be reattached by sending an attach protocol message
+ # @param [Hash] options
+ # @option options [Ably::Models::ErrorInfo] :reason
+ def request_reattach(options = {})
+ reason = options[:reason]
+ send_attach_protocol_message
+ logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" }
+ channel.set_channel_error_reason(reason) if reason
+ channel.transition_state_machine! :attaching, reason: reason unless channel.attaching?
end
- # When a channel is no longer attached or has failed,
- # all messages awaiting an ACK response should fail immediately
- def fail_messages_awaiting_ack(error)
- # Allow a short time for other queued operations to complete before failing all messages
- EventMachine.add_timer(0.1) do
- error = Ably::Exceptions::MessageDeliveryFailed.new("Channel cannot publish messages whilst state is '#{channel.state}'") unless error
+ def duplicate_attached_received(protocol_message)
+ if protocol_message.error
+ channel.set_channel_error_reason protocol_message.error
+ log_channel_error protocol_message.error
+ end
+
+ if protocol_message.has_channel_resumed_flag?
+ logger.debug { "ChannelManager: Additional resumed ATTACHED message received for #{channel.state} channel '#{channel.name}'" }
+ else
+ channel.emit :update, Ably::Models::ChannelStateChange.new(
+ current: channel.state,
+ previous: channel.state,
+ event: Ably::Realtime::Channel::EVENT(:update),
+ reason: protocol_message.error,
+ resumed: false,
+ )
+ update_presence_sync_state_following_attached protocol_message
+ end
+
+ channel.set_attached_serial protocol_message.channel_serial
+ end
+
+ # Handle DETACED messages, see #RTL13 for server-initated detaches
+ def detached_received(reason)
+ case channel.state.to_sym
+ when :detaching
+ channel.transition_state_machine :detached, reason: reason
+ when :attached, :suspended
+ channel.transition_state_machine :attaching, reason: reason
+ else
+ logger.debug { "ChannelManager: DETACHED ProtocolMessage received, but no action to take as not DETACHING, ATTACHED OR SUSPENDED" }
+ end
+ end
+
+ # When continuity on the connection is interrupted or channel becomes suspended (implying loss of continuity)
+ # then all messages published but awaiting an ACK from Ably should be failed with a NACK
+ # @param [Hash] options
+ # @option options [Boolean] :immediately
+ def fail_messages_awaiting_ack(error, options = {})
+ immediately = options[:immediately] || false
+
+ fail_proc = Proc.new do
+ error = Ably::Exceptions::MessageDeliveryFailed.new("Continuity of connection was lost so published messages awaiting ACK have failed") unless error
fail_messages_in_queue connection.__pending_message_ack_queue__, error
- fail_messages_in_queue connection.__outgoing_message_queue__, error
end
+
+ # Allow a short time for other queued operations to complete before failing all messages
+ if immediately
+ fail_proc.call
+ else
+ EventMachine.add_timer(0.1) { fail_proc.call }
+ end
end
+ # When a channel becomes detached, suspended or failed,
+ # all queued messages should be failed immediately as we don't queue in
+ # any of those states
+ def fail_queued_messages(error)
+ error = Ably::Exceptions::MessageDeliveryFailed.new("Queued messages on channel '#{channel.name}' in state '#{channel.state}' will never be delivered") unless error
+ fail_messages_in_queue connection.__outgoing_message_queue__, error
+ channel.__queue__.each do |message|
+ nack_message message, error
+ end
+ channel.__queue__.clear
+ end
+
def fail_messages_in_queue(queue, error)
queue.delete_if do |protocol_message|
- if protocol_message.channel == channel.name
- nack_messages protocol_message, error
- true
+ if protocol_message.action.match_any?(:presence, :message)
+ if protocol_message.channel == channel.name
+ nack_messages protocol_message, error
+ true
+ end
end
end
end
def nack_messages(protocol_message, error)
(protocol_message.messages + protocol_message.presence).each do |message|
- logger.debug "Calling NACK failure callbacks for #{message.class.name} - #{message.to_json}, protocol message: #{protocol_message}"
- message.fail error
+ nack_message message, error, protocol_message
end
- logger.debug "Calling NACK failure callbacks for #{protocol_message.class.name} - #{protocol_message.to_json}"
+ logger.debug { "Calling NACK failure callbacks for #{protocol_message.class.name} - #{protocol_message.to_json}" }
protocol_message.fail error
end
+ def nack_message(message, error, protocol_message = nil)
+ logger.debug { "Calling NACK failure callbacks for #{message.class.name} - #{message.to_json} #{"protocol message: #{protocol_message}" if protocol_message}" }
+ message.fail error
+ end
+
def drop_pending_queue_from_ack(ack_protocol_message)
message_serial_up_to = ack_protocol_message.message_serial + ack_protocol_message.count - 1
connection.__pending_message_ack_queue__.drop_while do |protocol_message|
if protocol_message.message_serial <= message_serial_up_to
yield protocol_message
true
end
end
end
+ # If the connection is still connected and the channel still suspended after
+ # channel_retry_timeout has passed, then attempt to reattach automatically, see #RTL13b
+ def start_attach_from_suspended_timer
+ cancel_attach_from_suspended_timer
+ if connection.connected?
+ channel.unsafe_once { |event| cancel_attach_from_suspended_timer unless event == :update }
+ connection.unsafe_once { |event| cancel_attach_from_suspended_timer unless event == :update }
+
+ @attach_from_suspended_timer = EventMachine::Timer.new(channel_retry_timeout) do
+ channel.transition_state_machine! :attaching
+ end
+ end
+ end
+
private
+ attr_reader :pending_state_change_timer
+
def channel
@channel
end
def connection
@connection
end
def_delegators :channel, :can_transition_to?
+ def cancel_attach_from_suspended_timer
+ @attach_from_suspended_timer.cancel if @attach_from_suspended_timer
+ @attach_from_suspended_timer = nil
+ end
+
# If the connection has not previously connected, connect now
def connect_if_connection_initialized
connection.connect if connection.initialized?
end
- def send_attach_protocol_message
- send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Attach
+ def realtime_request_timeout
+ connection.defaults.fetch(:realtime_request_timeout)
end
- def send_detach_protocol_message
- send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Detach
+ def channel_retry_timeout
+ connection.defaults.fetch(:channel_retry_timeout)
end
- def send_state_change_protocol_message(state)
- connection.send_protocol_message(
- action: state.to_i,
- channel: channel.name
- )
+ def send_attach_protocol_message
+ send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Attach, :suspended # move to suspended
end
- # Any message sent before an ACK/NACK was received on the previous transport
- # needs to be resent to the Ably service so that a subsequent ACK/NACK is received.
- # It is up to Ably to ensure that duplicate messages are not retransmitted on the channel
- # base on the serial numbers
- #
- # TODO: Move this into the Connection class, it does not belong in a Channel class
- #
- # @api private
- def resend_pending_message_ack_queue
- connection.__pending_message_ack_queue__.delete_if do |protocol_message|
- if protocol_message.channel == channel.name
- connection.__outgoing_message_queue__ << protocol_message
- connection.__outgoing_protocol_msgbus__.publish :protocol_message
- true
- end
- end
+ def send_detach_protocol_message(previous_state)
+ send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Detach, previous_state # return to previous state if failed
end
- def setup_connection_event_handlers
- connection.unsafe_on(:closed) do
- channel.transition_state_machine :detaching if can_transition_to?(:detaching)
+ def send_state_change_protocol_message(new_state, state_if_failed)
+ state_at_time_of_request = channel.state
+ @pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do
+ if channel.state == state_at_time_of_request
+ error = Ably::Models::ErrorInfo.new(code: 90007, message: "Channel #{new_state} operation failed (timed out)")
+ channel.transition_state_machine state_if_failed, reason: error
+ end
end
- connection.unsafe_on(:suspended) do |error|
- if can_transition_to?(:detaching)
- channel.transition_state_machine :detaching, reason: Ably::Exceptions::ConnectionSuspended.new('Connection suspended', nil, 80002, error)
- end
+ channel.once_state_changed do
+ @pending_state_change_timer.cancel if @pending_state_change_timer
+ @pending_state_change_timer = nil
end
- connection.unsafe_on(:failed) do |error|
- if can_transition_to?(:failed) && !channel.detached?
- channel.transition_state_machine :failed, reason: Ably::Exceptions::ConnectionFailed.new('Connection failed', nil, 80002, error)
+ resend_if_disconnected_and_connected = Proc.new do
+ connection.unsafe_once(:disconnected) do
+ next unless pending_state_change_timer
+ connection.unsafe_once(:connected) do
+ next unless pending_state_change_timer
+ connection.send_protocol_message(
+ action: new_state.to_i,
+ channel: channel.name
+ )
+ resend_if_disconnected_and_connected.call
+ end
end
end
+ resend_if_disconnected_and_connected.call
- connection.unsafe_on(:connected) do |error|
- resend_pending_message_ack_queue
+ connection.send_protocol_message(
+ action: new_state.to_i,
+ channel: channel.name
+ )
+ end
+
+ def update_presence_sync_state_following_attached(attached_protocol_message)
+ if attached_protocol_message.has_presence_flag?
+ channel.presence.manager.sync_expected
+ else
+ channel.presence.manager.sync_not_expected
end
end
def logger
connection.logger