lib/ably/realtime/client/incoming_message_dispatcher.rb in ably-0.8.15 vs lib/ably/realtime/client/incoming_message_dispatcher.rb in ably-1.0.0

- old
+ new

@@ -25,11 +25,11 @@ client.channels end def get_channel(channel_name) channels.fetch(channel_name) do - logger.warn "Received channel message for non-existent channel" + logger.warn { "Received channel message for non-existent channel" } Ably::Realtime::Models::NilChannel.new end end def logger @@ -41,23 +41,17 @@ unless protocol_message.kind_of?(Ably::Models::ProtocolMessage) raise ArgumentError, "Expected a ProtocolMessage. Received #{protocol_message}" end - unless [:nack, :error].include?(protocol_message.action) - logger.debug "#{protocol_message.action} received: #{protocol_message}" + unless protocol_message.action.match_any?(:nack, :error) + logger.debug { "#{protocol_message.action} received: #{protocol_message}" } end - if [:sync, :presence, :message].any? { |prevent_duplicate| protocol_message.action == prevent_duplicate } + if protocol_message.action.match_any?(:sync, :presence, :message) if connection.serial && protocol_message.has_connection_serial? && protocol_message.connection_serial <= connection.serial - error_target = if protocol_message.channel - get_channel(protocol_message.channel) - else - connection - end error_message = "Protocol error, duplicate message received for serial #{protocol_message.connection_serial}" - error_target.emit :error, Ably::Exceptions::ProtocolError.new(error_message, 400, 80013) logger.error error_message return end end @@ -67,19 +61,22 @@ when ACTION.Heartbeat when ACTION.Ack ack_pending_queue_for_message_serial(protocol_message) if protocol_message.has_message_serial? when ACTION.Nack - logger.warn "NACK received: #{protocol_message}" + logger.warn { "NACK received: #{protocol_message}" } nack_pending_queue_for_message_serial(protocol_message) if protocol_message.has_message_serial? when ACTION.Connect when ACTION.Connected - if connection.disconnected? || connection.closing? || connection.closed? || connection.failed? - logger.debug "Incoming CONNECTED ProtocolMessage discarded as connection has moved on and is in state: #{connection.state}" + if connection.closing? + logger.debug { "Out-of-order incoming CONNECTED ProtocolMessage discarded as connection has moved on and is in state: #{connection.state}" } + elsif connection.disconnected? || connection.closing? || connection.closed? || connection.failed? + logger.warn { "Out-of-order incoming CONNECTED ProtocolMessage discarded as connection has moved on and is in state: #{connection.state}" } elsif connection.connected? - logger.error "CONNECTED ProtocolMessage should not have been received when the connection is in the CONNECTED state" + logger.debug { "Updated CONNECTED ProtocolMessage received (whilst connected)" } + process_connected_update_message protocol_message else process_connected_message protocol_message end when ACTION.Disconnect, ACTION.Disconnected @@ -88,34 +85,35 @@ when ACTION.Close when ACTION.Closed connection.transition_state_machine :closed unless connection.closed? when ACTION.Error - if protocol_message.channel && !protocol_message.has_message_serial? + if protocol_message.channel dispatch_channel_error protocol_message else process_connection_error protocol_message end when ACTION.Attach when ACTION.Attached get_channel(protocol_message.channel).tap do |channel| - channel.transition_state_machine :attached, reason: protocol_message.error, protocol_message: protocol_message unless channel.attached? + if channel.attached? + channel.manager.duplicate_attached_received protocol_message + else + channel.transition_state_machine :attached, reason: protocol_message.error, resumed: protocol_message.has_channel_resumed_flag?, protocol_message: protocol_message + end end when ACTION.Detach when ACTION.Detached get_channel(protocol_message.channel).tap do |channel| - channel.transition_state_machine :detached unless channel.detached? + channel.manager.detached_received protocol_message.error end when ACTION.Sync presence = get_channel(protocol_message.channel).presence - protocol_message.presence.each do |presence_message| - presence.__incoming_msgbus__.publish :sync, presence_message - end - presence.members.update_sync_serial protocol_message.channel_serial + presence.manager.sync_process_messages protocol_message.channel_serial, protocol_message.presence when ACTION.Presence presence = get_channel(protocol_message.channel).presence protocol_message.presence.each do |presence_message| presence.__incoming_msgbus__.publish :presence, presence_message @@ -125,41 +123,52 @@ channel = get_channel(protocol_message.channel) protocol_message.messages.each do |message| channel.__incoming_msgbus__.publish :message, message end + when ACTION.Auth + client.auth.authorize + else error = Ably::Exceptions::ProtocolError.new("Protocol Message Action #{protocol_message.action} is unsupported by this MessageDispatcher", 400, 80013) - client.connection.emit :error, error logger.fatal error.message end + + connection.set_connection_confirmed_alive end def dispatch_channel_error(protocol_message) - logger.warn "Channel Error message received: #{protocol_message.error}" + logger.warn { "Channel Error message received: #{protocol_message.error}" } if !protocol_message.has_message_serial? get_channel(protocol_message.channel).transition_state_machine :failed, reason: protocol_message.error else - logger.fatal "Cannot process ProtocolMessage as not yet implemented: #{protocol_message}" + logger.fatal { "Cannot process ProtocolMessage ERROR with message serial as not yet implemented: #{protocol_message}" } end end def process_connection_error(protocol_message) connection.manager.error_received_from_server(protocol_message.error || Ably::Models::ErrorInfo.new(message: 'Error reason unknown')) end def process_connected_message(protocol_message) if client.auth.token_client_id_allowed?(protocol_message.connection_details.client_id) - client.auth.configure_client_id protocol_message.connection_details.client_id - client.connection.set_connection_details protocol_message.connection_details connection.transition_state_machine :connected, reason: protocol_message.error, protocol_message: protocol_message else - reason = Ably::Exceptions::IncompatibleClientId.new("Client ID '#{protocol_message.connection_details.client_id}' specified by the server is incompatible with the library's configured client ID '#{client.client_id}'", 400, 40012) + reason = Ably::Exceptions::IncompatibleClientId.new("Client ID '#{protocol_message.connection_details.client_id}' specified by the server is incompatible with the library's configured client ID '#{client.client_id}'") connection.transition_state_machine :failed, reason: reason, protocol_message: protocol_message end end + def process_connected_update_message(protocol_message) + if client.auth.token_client_id_allowed?(protocol_message.connection_details.client_id) + connection.manager.connected_update protocol_message + else + reason = Ably::Exceptions::IncompatibleClientId.new("Client ID '#{protocol_message.connection_details.client_id}' in CONNECTED update specified by the server is incompatible with the library's configured client ID '#{client.client_id}'") + connection.transition_state_machine :failed, reason: reason, protocol_message: protocol_message + end + end + def update_connection_recovery_info(protocol_message) connection.update_connection_serial protocol_message.connection_serial if protocol_message.has_connection_serial? end def ack_pending_queue_for_message_serial(ack_protocol_message) @@ -176,17 +185,17 @@ end end def ack_messages(messages) messages.each do |message| - logger.debug "Calling ACK success callbacks for #{message.class.name} - #{message.to_safe_json}" + logger.debug { "Calling ACK success callbacks for #{message.class.name} - #{message.to_json}" } message.succeed message end end def nack_messages(messages, protocol_message) messages.each do |message| - logger.debug "Calling NACK failure callbacks for #{message.class.name} - #{message.to_safe_json}, protocol message: #{protocol_message}" + logger.debug { "Calling NACK failure callbacks for #{message.class.name} - #{message.to_json}, protocol message: #{protocol_message}" } message.fail protocol_message.error end end def drop_pending_queue_from_ack(ack_protocol_message)