lib/ably/realtime/client/incoming_message_dispatcher.rb in ably-0.7.2 vs lib/ably/realtime/client/incoming_message_dispatcher.rb in ably-0.7.4
- old
+ new
@@ -39,10 +39,24 @@
unless [:nack, :error].include?(protocol_message.action)
logger.debug "#{protocol_message.action} received: #{protocol_message}"
end
+ if [:sync, :presence, :message].any? { |prevent_duplicate| protocol_message.action == prevent_duplicate }
+ if connection.serial && protocol_message.has_connection_serial? && protocol_message.connection_serial <= connection.serial
+ error_target = if protocol_message.channel
+ get_channel(protocol_message.channel)
+ else
+ connection
+ end
+ error_message = "Protocol error, duplicate message received for serial #{protocol_message.connection_serial}"
+ error_target.trigger :error, Ably::Exceptions::ProtocolError.new(error_message, 400, 80013)
+ logger.error error_message
+ return
+ end
+ end
+
update_connection_recovery_info protocol_message
case protocol_message.action
when ACTION.Heartbeat
when ACTION.Ack
@@ -52,40 +66,44 @@
logger.warn "NACK received: #{protocol_message}"
nack_pending_queue_for_message_serial(protocol_message) if protocol_message.has_message_serial?
when ACTION.Connect
when ACTION.Connected
- connection.transition_state_machine :connected, protocol_message.error
+ connection.transition_state_machine :connected, protocol_message unless connection.connected?
when ACTION.Disconnect, ACTION.Disconnected
- connection.transition_state_machine :disconnected, protocol_message.error
+ connection.transition_state_machine :disconnected, protocol_message.error unless connection.disconnected?
when ACTION.Close
when ACTION.Closed
- connection.transition_state_machine :closed
+ connection.transition_state_machine :closed unless connection.closed?
when ACTION.Error
if protocol_message.channel && !protocol_message.has_message_serial?
dispatch_channel_error protocol_message
else
process_connection_error protocol_message
end
when ACTION.Attach
when ACTION.Attached
- get_channel(protocol_message.channel).transition_state_machine :attached, protocol_message
+ get_channel(protocol_message.channel).tap do |channel|
+ channel.transition_state_machine :attached, protocol_message unless channel.attached?
+ end
when ACTION.Detach
when ACTION.Detached
- get_channel(protocol_message.channel).transition_state_machine :detached
+ get_channel(protocol_message.channel).tap do |channel|
+ channel.transition_state_machine :detached unless channel.detached?
+ end
when ACTION.Sync
presence = get_channel(protocol_message.channel).presence
protocol_message.presence.each do |presence_message|
presence.__incoming_msgbus__.publish :sync, presence_message
end
- presence.update_sync_serial protocol_message.channel_serial
+ presence.members.update_sync_serial protocol_message.channel_serial
when ACTION.Presence
presence = get_channel(protocol_message.channel).presence
protocol_message.presence.each do |presence_message|
presence.__incoming_msgbus__.publish :presence, presence_message
@@ -96,11 +114,13 @@
protocol_message.messages.each do |message|
channel.__incoming_msgbus__.publish :message, message
end
else
- raise ArgumentError, "Protocol Message Action #{protocol_message.action} is unsupported by this MessageDispatcher"
+ error = Ably::Exceptions::ProtocolError.new("Protocol Message Action #{protocol_message.action} is unsupported by this MessageDispatcher", 400, 80013)
+ client.connection.trigger :error, error
+ logger.fatal error.message
end
end
def dispatch_channel_error(protocol_message)
logger.warn "Channel Error message received: #{protocol_message.error}"
@@ -114,30 +134,13 @@
def process_connection_error(protocol_message)
connection.manager.error_received_from_server protocol_message.error
end
def update_connection_recovery_info(protocol_message)
- if protocol_message.connection_key && (protocol_message.connection_key != connection.key)
- logger.debug "New connection ID set to #{protocol_message.connection_id} with connection key #{protocol_message.connection_key}"
- detach_attached_channels protocol_message.error if protocol_message.error
- connection.configure_new protocol_message.connection_id, protocol_message.connection_key, protocol_message.connection_serial
- end
-
- if protocol_message.has_connection_serial?
- connection.update_connection_serial protocol_message.connection_serial
- end
+ connection.update_connection_serial protocol_message.connection_serial if protocol_message.has_connection_serial?
end
- def detach_attached_channels(error)
- channels.select do |channel|
- channel.attached? || channel.attaching?
- end.each do |channel|
- logger.warn "Detaching channel '#{channel.name}': #{error}"
- channel.manager.suspend error
- end
- end
-
def ack_pending_queue_for_message_serial(ack_protocol_message)
drop_pending_queue_from_ack(ack_protocol_message) do |protocol_message|
ack_messages protocol_message.messages
ack_messages protocol_message.presence
end
@@ -164,10 +167,10 @@
end
end
def drop_pending_queue_from_ack(ack_protocol_message)
message_serial_up_to = ack_protocol_message.message_serial + ack_protocol_message.count - 1
- connection.__pending_message_queue__.drop_while do |protocol_message|
+ connection.__pending_message_ack_queue__.drop_while do |protocol_message|
if protocol_message.message_serial <= message_serial_up_to
yield protocol_message
true
end
end