lib/ably/realtime/client/incoming_message_dispatcher.rb in ably-0.8.15 vs lib/ably/realtime/client/incoming_message_dispatcher.rb in ably-1.0.0
- old
+ new
@@ -25,11 +25,11 @@
client.channels
end
def get_channel(channel_name)
channels.fetch(channel_name) do
- logger.warn "Received channel message for non-existent channel"
+ logger.warn { "Received channel message for non-existent channel" }
Ably::Realtime::Models::NilChannel.new
end
end
def logger
@@ -41,23 +41,17 @@
unless protocol_message.kind_of?(Ably::Models::ProtocolMessage)
raise ArgumentError, "Expected a ProtocolMessage. Received #{protocol_message}"
end
- unless [:nack, :error].include?(protocol_message.action)
- logger.debug "#{protocol_message.action} received: #{protocol_message}"
+ unless protocol_message.action.match_any?(:nack, :error)
+ logger.debug { "#{protocol_message.action} received: #{protocol_message}" }
end
- if [:sync, :presence, :message].any? { |prevent_duplicate| protocol_message.action == prevent_duplicate }
+ if protocol_message.action.match_any?(:sync, :presence, :message)
if connection.serial && protocol_message.has_connection_serial? && protocol_message.connection_serial <= connection.serial
- error_target = if protocol_message.channel
- get_channel(protocol_message.channel)
- else
- connection
- end
error_message = "Protocol error, duplicate message received for serial #{protocol_message.connection_serial}"
- error_target.emit :error, Ably::Exceptions::ProtocolError.new(error_message, 400, 80013)
logger.error error_message
return
end
end
@@ -67,19 +61,22 @@
when ACTION.Heartbeat
when ACTION.Ack
ack_pending_queue_for_message_serial(protocol_message) if protocol_message.has_message_serial?
when ACTION.Nack
- logger.warn "NACK received: #{protocol_message}"
+ logger.warn { "NACK received: #{protocol_message}" }
nack_pending_queue_for_message_serial(protocol_message) if protocol_message.has_message_serial?
when ACTION.Connect
when ACTION.Connected
- if connection.disconnected? || connection.closing? || connection.closed? || connection.failed?
- logger.debug "Incoming CONNECTED ProtocolMessage discarded as connection has moved on and is in state: #{connection.state}"
+ if connection.closing?
+ logger.debug { "Out-of-order incoming CONNECTED ProtocolMessage discarded as connection has moved on and is in state: #{connection.state}" }
+ elsif connection.disconnected? || connection.closing? || connection.closed? || connection.failed?
+ logger.warn { "Out-of-order incoming CONNECTED ProtocolMessage discarded as connection has moved on and is in state: #{connection.state}" }
elsif connection.connected?
- logger.error "CONNECTED ProtocolMessage should not have been received when the connection is in the CONNECTED state"
+ logger.debug { "Updated CONNECTED ProtocolMessage received (whilst connected)" }
+ process_connected_update_message protocol_message
else
process_connected_message protocol_message
end
when ACTION.Disconnect, ACTION.Disconnected
@@ -88,34 +85,35 @@
when ACTION.Close
when ACTION.Closed
connection.transition_state_machine :closed unless connection.closed?
when ACTION.Error
- if protocol_message.channel && !protocol_message.has_message_serial?
+ if protocol_message.channel
dispatch_channel_error protocol_message
else
process_connection_error protocol_message
end
when ACTION.Attach
when ACTION.Attached
get_channel(protocol_message.channel).tap do |channel|
- channel.transition_state_machine :attached, reason: protocol_message.error, protocol_message: protocol_message unless channel.attached?
+ if channel.attached?
+ channel.manager.duplicate_attached_received protocol_message
+ else
+ channel.transition_state_machine :attached, reason: protocol_message.error, resumed: protocol_message.has_channel_resumed_flag?, protocol_message: protocol_message
+ end
end
when ACTION.Detach
when ACTION.Detached
get_channel(protocol_message.channel).tap do |channel|
- channel.transition_state_machine :detached unless channel.detached?
+ channel.manager.detached_received protocol_message.error
end
when ACTION.Sync
presence = get_channel(protocol_message.channel).presence
- protocol_message.presence.each do |presence_message|
- presence.__incoming_msgbus__.publish :sync, presence_message
- end
- presence.members.update_sync_serial protocol_message.channel_serial
+ presence.manager.sync_process_messages protocol_message.channel_serial, protocol_message.presence
when ACTION.Presence
presence = get_channel(protocol_message.channel).presence
protocol_message.presence.each do |presence_message|
presence.__incoming_msgbus__.publish :presence, presence_message
@@ -125,41 +123,52 @@
channel = get_channel(protocol_message.channel)
protocol_message.messages.each do |message|
channel.__incoming_msgbus__.publish :message, message
end
+ when ACTION.Auth
+ client.auth.authorize
+
else
error = Ably::Exceptions::ProtocolError.new("Protocol Message Action #{protocol_message.action} is unsupported by this MessageDispatcher", 400, 80013)
- client.connection.emit :error, error
logger.fatal error.message
end
+
+ connection.set_connection_confirmed_alive
end
def dispatch_channel_error(protocol_message)
- logger.warn "Channel Error message received: #{protocol_message.error}"
+ logger.warn { "Channel Error message received: #{protocol_message.error}" }
if !protocol_message.has_message_serial?
get_channel(protocol_message.channel).transition_state_machine :failed, reason: protocol_message.error
else
- logger.fatal "Cannot process ProtocolMessage as not yet implemented: #{protocol_message}"
+ logger.fatal { "Cannot process ProtocolMessage ERROR with message serial as not yet implemented: #{protocol_message}" }
end
end
def process_connection_error(protocol_message)
connection.manager.error_received_from_server(protocol_message.error || Ably::Models::ErrorInfo.new(message: 'Error reason unknown'))
end
def process_connected_message(protocol_message)
if client.auth.token_client_id_allowed?(protocol_message.connection_details.client_id)
- client.auth.configure_client_id protocol_message.connection_details.client_id
- client.connection.set_connection_details protocol_message.connection_details
connection.transition_state_machine :connected, reason: protocol_message.error, protocol_message: protocol_message
else
- reason = Ably::Exceptions::IncompatibleClientId.new("Client ID '#{protocol_message.connection_details.client_id}' specified by the server is incompatible with the library's configured client ID '#{client.client_id}'", 400, 40012)
+ reason = Ably::Exceptions::IncompatibleClientId.new("Client ID '#{protocol_message.connection_details.client_id}' specified by the server is incompatible with the library's configured client ID '#{client.client_id}'")
connection.transition_state_machine :failed, reason: reason, protocol_message: protocol_message
end
end
+ def process_connected_update_message(protocol_message)
+ if client.auth.token_client_id_allowed?(protocol_message.connection_details.client_id)
+ connection.manager.connected_update protocol_message
+ else
+ reason = Ably::Exceptions::IncompatibleClientId.new("Client ID '#{protocol_message.connection_details.client_id}' in CONNECTED update specified by the server is incompatible with the library's configured client ID '#{client.client_id}'")
+ connection.transition_state_machine :failed, reason: reason, protocol_message: protocol_message
+ end
+ end
+
def update_connection_recovery_info(protocol_message)
connection.update_connection_serial protocol_message.connection_serial if protocol_message.has_connection_serial?
end
def ack_pending_queue_for_message_serial(ack_protocol_message)
@@ -176,17 +185,17 @@
end
end
def ack_messages(messages)
messages.each do |message|
- logger.debug "Calling ACK success callbacks for #{message.class.name} - #{message.to_safe_json}"
+ logger.debug { "Calling ACK success callbacks for #{message.class.name} - #{message.to_json}" }
message.succeed message
end
end
def nack_messages(messages, protocol_message)
messages.each do |message|
- logger.debug "Calling NACK failure callbacks for #{message.class.name} - #{message.to_safe_json}, protocol message: #{protocol_message}"
+ logger.debug { "Calling NACK failure callbacks for #{message.class.name} - #{message.to_json}, protocol message: #{protocol_message}" }
message.fail protocol_message.error
end
end
def drop_pending_queue_from_ack(ack_protocol_message)