lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb in ably-rest-0.9.3 vs lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb in ably-rest-1.0.0

- old
+ new

@@ -10,12 +10,10 @@ extend Forwardable def initialize(channel, connection) @channel = channel @connection = connection - - setup_connection_event_handlers end # Commence attachment def attach if can_transition_to?(:attached) @@ -23,145 +21,236 @@ send_attach_protocol_message end end # Commence attachment - def detach(error = nil) + def detach(error, previous_state) if connection.closed? || connection.connecting? || connection.suspended? channel.transition_state_machine :detached, reason: error elsif can_transition_to?(:detached) - send_detach_protocol_message + send_detach_protocol_message previous_state end end # Channel is attached, notify presence if sync is expected def attached(attached_protocol_message) - if attached_protocol_message.has_presence_flag? - channel.presence.manager.sync_expected - else - channel.presence.manager.sync_not_expected + # If no attached ProtocolMessage then this attached request was triggered by the client + # library, such as returning to attached whne detach has failed + if attached_protocol_message + update_presence_sync_state_following_attached attached_protocol_message + channel.set_attached_serial attached_protocol_message.channel_serial end - channel.set_attached_serial attached_protocol_message.channel_serial end # An error has occurred on the channel - def emit_error(error) - logger.error "ChannelManager: Channel '#{channel.name}' error: #{error}" - channel.emit :error, error + def log_channel_error(error) + logger.error { "ChannelManager: Channel '#{channel.name}' error: #{error}" } end - # Detach a channel as a result of an error - def suspend(error) - channel.transition_state_machine! :detaching, reason: error + # Request channel to be reattached by sending an attach protocol message + # @param [Hash] options + # @option options [Ably::Models::ErrorInfo] :reason + def request_reattach(options = {}) + reason = options[:reason] + send_attach_protocol_message + logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" } + channel.set_channel_error_reason(reason) if reason + channel.transition_state_machine! :attaching, reason: reason unless channel.attaching? end - # When a channel is no longer attached or has failed, - # all messages awaiting an ACK response should fail immediately - def fail_messages_awaiting_ack(error) - # Allow a short time for other queued operations to complete before failing all messages - EventMachine.add_timer(0.1) do - error = Ably::Exceptions::MessageDeliveryFailed.new("Channel cannot publish messages whilst state is '#{channel.state}'") unless error + def duplicate_attached_received(protocol_message) + if protocol_message.error + channel.set_channel_error_reason protocol_message.error + log_channel_error protocol_message.error + end + + if protocol_message.has_channel_resumed_flag? + logger.debug { "ChannelManager: Additional resumed ATTACHED message received for #{channel.state} channel '#{channel.name}'" } + else + channel.emit :update, Ably::Models::ChannelStateChange.new( + current: channel.state, + previous: channel.state, + event: Ably::Realtime::Channel::EVENT(:update), + reason: protocol_message.error, + resumed: false, + ) + update_presence_sync_state_following_attached protocol_message + end + + channel.set_attached_serial protocol_message.channel_serial + end + + # Handle DETACED messages, see #RTL13 for server-initated detaches + def detached_received(reason) + case channel.state.to_sym + when :detaching + channel.transition_state_machine :detached, reason: reason + when :attached, :suspended + channel.transition_state_machine :attaching, reason: reason + else + logger.debug { "ChannelManager: DETACHED ProtocolMessage received, but no action to take as not DETACHING, ATTACHED OR SUSPENDED" } + end + end + + # When continuity on the connection is interrupted or channel becomes suspended (implying loss of continuity) + # then all messages published but awaiting an ACK from Ably should be failed with a NACK + # @param [Hash] options + # @option options [Boolean] :immediately + def fail_messages_awaiting_ack(error, options = {}) + immediately = options[:immediately] || false + + fail_proc = Proc.new do + error = Ably::Exceptions::MessageDeliveryFailed.new("Continuity of connection was lost so published messages awaiting ACK have failed") unless error fail_messages_in_queue connection.__pending_message_ack_queue__, error - fail_messages_in_queue connection.__outgoing_message_queue__, error end + + # Allow a short time for other queued operations to complete before failing all messages + if immediately + fail_proc.call + else + EventMachine.add_timer(0.1) { fail_proc.call } + end end + # When a channel becomes detached, suspended or failed, + # all queued messages should be failed immediately as we don't queue in + # any of those states + def fail_queued_messages(error) + error = Ably::Exceptions::MessageDeliveryFailed.new("Queued messages on channel '#{channel.name}' in state '#{channel.state}' will never be delivered") unless error + fail_messages_in_queue connection.__outgoing_message_queue__, error + channel.__queue__.each do |message| + nack_message message, error + end + channel.__queue__.clear + end + def fail_messages_in_queue(queue, error) queue.delete_if do |protocol_message| - if protocol_message.channel == channel.name - nack_messages protocol_message, error - true + if protocol_message.action.match_any?(:presence, :message) + if protocol_message.channel == channel.name + nack_messages protocol_message, error + true + end end end end def nack_messages(protocol_message, error) (protocol_message.messages + protocol_message.presence).each do |message| - logger.debug "Calling NACK failure callbacks for #{message.class.name} - #{message.to_json}, protocol message: #{protocol_message}" - message.fail error + nack_message message, error, protocol_message end - logger.debug "Calling NACK failure callbacks for #{protocol_message.class.name} - #{protocol_message.to_json}" + logger.debug { "Calling NACK failure callbacks for #{protocol_message.class.name} - #{protocol_message.to_json}" } protocol_message.fail error end + def nack_message(message, error, protocol_message = nil) + logger.debug { "Calling NACK failure callbacks for #{message.class.name} - #{message.to_json} #{"protocol message: #{protocol_message}" if protocol_message}" } + message.fail error + end + def drop_pending_queue_from_ack(ack_protocol_message) message_serial_up_to = ack_protocol_message.message_serial + ack_protocol_message.count - 1 connection.__pending_message_ack_queue__.drop_while do |protocol_message| if protocol_message.message_serial <= message_serial_up_to yield protocol_message true end end end + # If the connection is still connected and the channel still suspended after + # channel_retry_timeout has passed, then attempt to reattach automatically, see #RTL13b + def start_attach_from_suspended_timer + cancel_attach_from_suspended_timer + if connection.connected? + channel.unsafe_once { |event| cancel_attach_from_suspended_timer unless event == :update } + connection.unsafe_once { |event| cancel_attach_from_suspended_timer unless event == :update } + + @attach_from_suspended_timer = EventMachine::Timer.new(channel_retry_timeout) do + channel.transition_state_machine! :attaching + end + end + end + private + attr_reader :pending_state_change_timer + def channel @channel end def connection @connection end def_delegators :channel, :can_transition_to? + def cancel_attach_from_suspended_timer + @attach_from_suspended_timer.cancel if @attach_from_suspended_timer + @attach_from_suspended_timer = nil + end + # If the connection has not previously connected, connect now def connect_if_connection_initialized connection.connect if connection.initialized? end - def send_attach_protocol_message - send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Attach + def realtime_request_timeout + connection.defaults.fetch(:realtime_request_timeout) end - def send_detach_protocol_message - send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Detach + def channel_retry_timeout + connection.defaults.fetch(:channel_retry_timeout) end - def send_state_change_protocol_message(state) - connection.send_protocol_message( - action: state.to_i, - channel: channel.name - ) + def send_attach_protocol_message + send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Attach, :suspended # move to suspended end - # Any message sent before an ACK/NACK was received on the previous transport - # needs to be resent to the Ably service so that a subsequent ACK/NACK is received. - # It is up to Ably to ensure that duplicate messages are not retransmitted on the channel - # base on the serial numbers - # - # TODO: Move this into the Connection class, it does not belong in a Channel class - # - # @api private - def resend_pending_message_ack_queue - connection.__pending_message_ack_queue__.delete_if do |protocol_message| - if protocol_message.channel == channel.name - connection.__outgoing_message_queue__ << protocol_message - connection.__outgoing_protocol_msgbus__.publish :protocol_message - true - end - end + def send_detach_protocol_message(previous_state) + send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Detach, previous_state # return to previous state if failed end - def setup_connection_event_handlers - connection.unsafe_on(:closed) do - channel.transition_state_machine :detaching if can_transition_to?(:detaching) + def send_state_change_protocol_message(new_state, state_if_failed) + state_at_time_of_request = channel.state + @pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do + if channel.state == state_at_time_of_request + error = Ably::Models::ErrorInfo.new(code: 90007, message: "Channel #{new_state} operation failed (timed out)") + channel.transition_state_machine state_if_failed, reason: error + end end - connection.unsafe_on(:suspended) do |error| - if can_transition_to?(:detaching) - channel.transition_state_machine :detaching, reason: Ably::Exceptions::ConnectionSuspended.new('Connection suspended', nil, 80002, error) - end + channel.once_state_changed do + @pending_state_change_timer.cancel if @pending_state_change_timer + @pending_state_change_timer = nil end - connection.unsafe_on(:failed) do |error| - if can_transition_to?(:failed) && !channel.detached? - channel.transition_state_machine :failed, reason: Ably::Exceptions::ConnectionFailed.new('Connection failed', nil, 80002, error) + resend_if_disconnected_and_connected = Proc.new do + connection.unsafe_once(:disconnected) do + next unless pending_state_change_timer + connection.unsafe_once(:connected) do + next unless pending_state_change_timer + connection.send_protocol_message( + action: new_state.to_i, + channel: channel.name + ) + resend_if_disconnected_and_connected.call + end end end + resend_if_disconnected_and_connected.call - connection.unsafe_on(:connected) do |error| - resend_pending_message_ack_queue + connection.send_protocol_message( + action: new_state.to_i, + channel: channel.name + ) + end + + def update_presence_sync_state_following_attached(attached_protocol_message) + if attached_protocol_message.has_presence_flag? + channel.presence.manager.sync_expected + else + channel.presence.manager.sync_not_expected end end def logger connection.logger