lib/ably/realtime/client/incoming_message_dispatcher.rb in ably-0.7.2 vs lib/ably/realtime/client/incoming_message_dispatcher.rb in ably-0.7.4

- old
+ new

@@ -39,10 +39,24 @@ unless [:nack, :error].include?(protocol_message.action) logger.debug "#{protocol_message.action} received: #{protocol_message}" end + if [:sync, :presence, :message].any? { |prevent_duplicate| protocol_message.action == prevent_duplicate } + if connection.serial && protocol_message.has_connection_serial? && protocol_message.connection_serial <= connection.serial + error_target = if protocol_message.channel + get_channel(protocol_message.channel) + else + connection + end + error_message = "Protocol error, duplicate message received for serial #{protocol_message.connection_serial}" + error_target.trigger :error, Ably::Exceptions::ProtocolError.new(error_message, 400, 80013) + logger.error error_message + return + end + end + update_connection_recovery_info protocol_message case protocol_message.action when ACTION.Heartbeat when ACTION.Ack @@ -52,40 +66,44 @@ logger.warn "NACK received: #{protocol_message}" nack_pending_queue_for_message_serial(protocol_message) if protocol_message.has_message_serial? when ACTION.Connect when ACTION.Connected - connection.transition_state_machine :connected, protocol_message.error + connection.transition_state_machine :connected, protocol_message unless connection.connected? when ACTION.Disconnect, ACTION.Disconnected - connection.transition_state_machine :disconnected, protocol_message.error + connection.transition_state_machine :disconnected, protocol_message.error unless connection.disconnected? when ACTION.Close when ACTION.Closed - connection.transition_state_machine :closed + connection.transition_state_machine :closed unless connection.closed? when ACTION.Error if protocol_message.channel && !protocol_message.has_message_serial? dispatch_channel_error protocol_message else process_connection_error protocol_message end when ACTION.Attach when ACTION.Attached - get_channel(protocol_message.channel).transition_state_machine :attached, protocol_message + get_channel(protocol_message.channel).tap do |channel| + channel.transition_state_machine :attached, protocol_message unless channel.attached? + end when ACTION.Detach when ACTION.Detached - get_channel(protocol_message.channel).transition_state_machine :detached + get_channel(protocol_message.channel).tap do |channel| + channel.transition_state_machine :detached unless channel.detached? + end when ACTION.Sync presence = get_channel(protocol_message.channel).presence protocol_message.presence.each do |presence_message| presence.__incoming_msgbus__.publish :sync, presence_message end - presence.update_sync_serial protocol_message.channel_serial + presence.members.update_sync_serial protocol_message.channel_serial when ACTION.Presence presence = get_channel(protocol_message.channel).presence protocol_message.presence.each do |presence_message| presence.__incoming_msgbus__.publish :presence, presence_message @@ -96,11 +114,13 @@ protocol_message.messages.each do |message| channel.__incoming_msgbus__.publish :message, message end else - raise ArgumentError, "Protocol Message Action #{protocol_message.action} is unsupported by this MessageDispatcher" + error = Ably::Exceptions::ProtocolError.new("Protocol Message Action #{protocol_message.action} is unsupported by this MessageDispatcher", 400, 80013) + client.connection.trigger :error, error + logger.fatal error.message end end def dispatch_channel_error(protocol_message) logger.warn "Channel Error message received: #{protocol_message.error}" @@ -114,30 +134,13 @@ def process_connection_error(protocol_message) connection.manager.error_received_from_server protocol_message.error end def update_connection_recovery_info(protocol_message) - if protocol_message.connection_key && (protocol_message.connection_key != connection.key) - logger.debug "New connection ID set to #{protocol_message.connection_id} with connection key #{protocol_message.connection_key}" - detach_attached_channels protocol_message.error if protocol_message.error - connection.configure_new protocol_message.connection_id, protocol_message.connection_key, protocol_message.connection_serial - end - - if protocol_message.has_connection_serial? - connection.update_connection_serial protocol_message.connection_serial - end + connection.update_connection_serial protocol_message.connection_serial if protocol_message.has_connection_serial? end - def detach_attached_channels(error) - channels.select do |channel| - channel.attached? || channel.attaching? - end.each do |channel| - logger.warn "Detaching channel '#{channel.name}': #{error}" - channel.manager.suspend error - end - end - def ack_pending_queue_for_message_serial(ack_protocol_message) drop_pending_queue_from_ack(ack_protocol_message) do |protocol_message| ack_messages protocol_message.messages ack_messages protocol_message.presence end @@ -164,10 +167,10 @@ end end def drop_pending_queue_from_ack(ack_protocol_message) message_serial_up_to = ack_protocol_message.message_serial + ack_protocol_message.count - 1 - connection.__pending_message_queue__.drop_while do |protocol_message| + connection.__pending_message_ack_queue__.drop_while do |protocol_message| if protocol_message.message_serial <= message_serial_up_to yield protocol_message true end end