lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb in ably-rest-1.2.4 vs lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb in ably-rest-1.2.6
- old
+ new
@@ -16,11 +16,11 @@
# Commence attachment
def attach
if can_transition_to?(:attached)
connect_if_connection_initialized
- send_attach_protocol_message
+ send_attach_protocol_message if connection.state?(:connected) # RTL4i
end
end
# Commence attachment
def detach(error, previous_state)
@@ -32,13 +32,13 @@
end
# Channel is attached, notify presence if sync is expected
def attached(attached_protocol_message)
# If no attached ProtocolMessage then this attached request was triggered by the client
- # library, such as returning to attached whne detach has failed
+ # library, such as returning to attached when detach has failed
if attached_protocol_message
- update_presence_sync_state_following_attached attached_protocol_message
+ channel.presence.manager.on_attach attached_protocol_message.has_presence_flag?
channel.properties.set_attach_serial(attached_protocol_message.channel_serial)
channel.options.set_modes_from_flags(attached_protocol_message.flags)
channel.options.set_params(attached_protocol_message.params)
end
end
@@ -47,40 +47,37 @@
def log_channel_error(error)
logger.error { "ChannelManager: Channel '#{channel.name}' error: #{error}" }
end
# Request channel to be reattached by sending an attach protocol message
- # @param [Hash] options
- # @option options [Ably::Models::ErrorInfo] :reason
- def request_reattach(options = {})
- reason = options[:reason]
- send_attach_protocol_message
- logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" }
+ # @param [Ably::Models::ErrorInfo] reason
+ def request_reattach(reason = nil)
channel.set_channel_error_reason(reason) if reason
channel.transition_state_machine! :attaching, reason: reason unless channel.attaching?
+ send_attach_protocol_message
+ logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" }
end
def duplicate_attached_received(protocol_message)
+ logger.debug { "Server initiated ATTACHED message received for channel '#{channel.name}' with state #{channel.state}" }
if protocol_message.error
channel.set_channel_error_reason protocol_message.error
log_channel_error protocol_message.error
end
channel.properties.set_attach_serial(protocol_message.channel_serial)
channel.options.set_modes_from_flags(protocol_message.flags)
- if protocol_message.has_channel_resumed_flag?
- logger.debug { "ChannelManager: Additional resumed ATTACHED message received for #{channel.state} channel '#{channel.name}'" }
- else
+ unless protocol_message.has_channel_resumed_flag?
channel.emit :update, Ably::Models::ChannelStateChange.new(
current: channel.state,
previous: channel.state,
event: Ably::Realtime::Channel::EVENT(:update),
reason: protocol_message.error,
resumed: false,
)
- update_presence_sync_state_following_attached protocol_message
+ channel.presence.manager.on_attach protocol_message.has_presence_flag?
end
end
# Handle DETACED messages, see #RTL13 for server-initated detaches
def detached_received(reason)
@@ -168,10 +165,16 @@
channel.transition_state_machine! :attaching
end
end
end
+ # RTL13c
+ def notify_state_change
+ @pending_state_change_timer.cancel if @pending_state_change_timer
+ @pending_state_change_timer = nil
+ end
+
private
attr_reader :pending_state_change_timer
def channel
@channel
@@ -207,59 +210,59 @@
message_options[:flags] = channel.options.modes_to_flags if channel.options.modes
if channel.attach_resume
message_options[:flags] = message_options[:flags].to_i | Ably::Models::ProtocolMessage::ATTACH_FLAGS_MAPPING[:resume]
end
- send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Attach, :suspended, message_options
- end
+ message_options[:channelSerial] = channel.properties.channel_serial # RTL4c1
- def send_detach_protocol_message(previous_state)
- send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Detach, previous_state # return to previous state if failed
+ state_at_time_of_request = channel.state
+ attach_action = Ably::Models::ProtocolMessage::ACTION.Attach
+ # RTL4f
+ @pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do
+ if channel.state == state_at_time_of_request
+ error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{attach_action} operation failed (timed out)")
+ channel.transition_state_machine :suspended, reason: error # return to suspended state if failed
+ end
+ end
+ # Shouldn't queue attach message as per RTL4i, so message is added top of the queue
+ # to be sent immediately while processing next message
+ connection.send_protocol_message_immediately(
+ action: attach_action.to_i,
+ channel: channel.name,
+ **message_options.to_h
+ )
end
- def send_state_change_protocol_message(new_state, state_if_failed, message_options = {})
+ def send_detach_protocol_message(previous_state)
state_at_time_of_request = channel.state
+ detach_action = Ably::Models::ProtocolMessage::ACTION.Detach
+
@pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do
if channel.state == state_at_time_of_request
- error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{new_state} operation failed (timed out)")
- channel.transition_state_machine state_if_failed, reason: error
+ error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{detach_action} operation failed (timed out)")
+ channel.transition_state_machine previous_state, reason: error # return to previous state if failed
end
end
- channel.once_state_changed do
- @pending_state_change_timer.cancel if @pending_state_change_timer
- @pending_state_change_timer = nil
- end
-
- resend_if_disconnected_and_connected = lambda do
+ on_disconnected_and_connected = lambda do |&block|
connection.unsafe_once(:disconnected) do
- next unless pending_state_change_timer
connection.unsafe_once(:connected) do
- next unless pending_state_change_timer
- connection.send_protocol_message(
- action: new_state.to_i,
- channel: channel.name,
- **message_options.to_h
- )
- resend_if_disconnected_and_connected.call
- end
+ block.call if pending_state_change_timer
+ end if pending_state_change_timer
end
end
- resend_if_disconnected_and_connected.call
- connection.send_protocol_message(
- action: new_state.to_i,
- channel: channel.name,
- **message_options.to_h
- )
- end
-
- def update_presence_sync_state_following_attached(attached_protocol_message)
- if attached_protocol_message.has_presence_flag?
- channel.presence.manager.sync_expected
- else
- channel.presence.manager.sync_not_expected
+ send_detach_message = lambda do
+ on_disconnected_and_connected.call do
+ send_detach_message.call
+ end
+ connection.send_protocol_message(
+ action: detach_action.to_i,
+ channel: channel.name
+ )
end
+
+ send_detach_message.call
end
def logger
connection.logger
end