lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb in ably-rest-1.2.4 vs lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb in ably-rest-1.2.6

- old
+ new

@@ -16,11 +16,11 @@ # Commence attachment def attach if can_transition_to?(:attached) connect_if_connection_initialized - send_attach_protocol_message + send_attach_protocol_message if connection.state?(:connected) # RTL4i end end # Commence attachment def detach(error, previous_state) @@ -32,13 +32,13 @@ end # Channel is attached, notify presence if sync is expected def attached(attached_protocol_message) # If no attached ProtocolMessage then this attached request was triggered by the client - # library, such as returning to attached whne detach has failed + # library, such as returning to attached when detach has failed if attached_protocol_message - update_presence_sync_state_following_attached attached_protocol_message + channel.presence.manager.on_attach attached_protocol_message.has_presence_flag? channel.properties.set_attach_serial(attached_protocol_message.channel_serial) channel.options.set_modes_from_flags(attached_protocol_message.flags) channel.options.set_params(attached_protocol_message.params) end end @@ -47,40 +47,37 @@ def log_channel_error(error) logger.error { "ChannelManager: Channel '#{channel.name}' error: #{error}" } end # Request channel to be reattached by sending an attach protocol message - # @param [Hash] options - # @option options [Ably::Models::ErrorInfo] :reason - def request_reattach(options = {}) - reason = options[:reason] - send_attach_protocol_message - logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" } + # @param [Ably::Models::ErrorInfo] reason + def request_reattach(reason = nil) channel.set_channel_error_reason(reason) if reason channel.transition_state_machine! :attaching, reason: reason unless channel.attaching? + send_attach_protocol_message + logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" } end def duplicate_attached_received(protocol_message) + logger.debug { "Server initiated ATTACHED message received for channel '#{channel.name}' with state #{channel.state}" } if protocol_message.error channel.set_channel_error_reason protocol_message.error log_channel_error protocol_message.error end channel.properties.set_attach_serial(protocol_message.channel_serial) channel.options.set_modes_from_flags(protocol_message.flags) - if protocol_message.has_channel_resumed_flag? - logger.debug { "ChannelManager: Additional resumed ATTACHED message received for #{channel.state} channel '#{channel.name}'" } - else + unless protocol_message.has_channel_resumed_flag? channel.emit :update, Ably::Models::ChannelStateChange.new( current: channel.state, previous: channel.state, event: Ably::Realtime::Channel::EVENT(:update), reason: protocol_message.error, resumed: false, ) - update_presence_sync_state_following_attached protocol_message + channel.presence.manager.on_attach protocol_message.has_presence_flag? end end # Handle DETACED messages, see #RTL13 for server-initated detaches def detached_received(reason) @@ -168,10 +165,16 @@ channel.transition_state_machine! :attaching end end end + # RTL13c + def notify_state_change + @pending_state_change_timer.cancel if @pending_state_change_timer + @pending_state_change_timer = nil + end + private attr_reader :pending_state_change_timer def channel @channel @@ -207,59 +210,59 @@ message_options[:flags] = channel.options.modes_to_flags if channel.options.modes if channel.attach_resume message_options[:flags] = message_options[:flags].to_i | Ably::Models::ProtocolMessage::ATTACH_FLAGS_MAPPING[:resume] end - send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Attach, :suspended, message_options - end + message_options[:channelSerial] = channel.properties.channel_serial # RTL4c1 - def send_detach_protocol_message(previous_state) - send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Detach, previous_state # return to previous state if failed + state_at_time_of_request = channel.state + attach_action = Ably::Models::ProtocolMessage::ACTION.Attach + # RTL4f + @pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do + if channel.state == state_at_time_of_request + error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{attach_action} operation failed (timed out)") + channel.transition_state_machine :suspended, reason: error # return to suspended state if failed + end + end + # Shouldn't queue attach message as per RTL4i, so message is added top of the queue + # to be sent immediately while processing next message + connection.send_protocol_message_immediately( + action: attach_action.to_i, + channel: channel.name, + **message_options.to_h + ) end - def send_state_change_protocol_message(new_state, state_if_failed, message_options = {}) + def send_detach_protocol_message(previous_state) state_at_time_of_request = channel.state + detach_action = Ably::Models::ProtocolMessage::ACTION.Detach + @pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do if channel.state == state_at_time_of_request - error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{new_state} operation failed (timed out)") - channel.transition_state_machine state_if_failed, reason: error + error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{detach_action} operation failed (timed out)") + channel.transition_state_machine previous_state, reason: error # return to previous state if failed end end - channel.once_state_changed do - @pending_state_change_timer.cancel if @pending_state_change_timer - @pending_state_change_timer = nil - end - - resend_if_disconnected_and_connected = lambda do + on_disconnected_and_connected = lambda do |&block| connection.unsafe_once(:disconnected) do - next unless pending_state_change_timer connection.unsafe_once(:connected) do - next unless pending_state_change_timer - connection.send_protocol_message( - action: new_state.to_i, - channel: channel.name, - **message_options.to_h - ) - resend_if_disconnected_and_connected.call - end + block.call if pending_state_change_timer + end if pending_state_change_timer end end - resend_if_disconnected_and_connected.call - connection.send_protocol_message( - action: new_state.to_i, - channel: channel.name, - **message_options.to_h - ) - end - - def update_presence_sync_state_following_attached(attached_protocol_message) - if attached_protocol_message.has_presence_flag? - channel.presence.manager.sync_expected - else - channel.presence.manager.sync_not_expected + send_detach_message = lambda do + on_disconnected_and_connected.call do + send_detach_message.call + end + connection.send_protocol_message( + action: detach_action.to_i, + channel: channel.name + ) end + + send_detach_message.call end def logger connection.logger end