lib/ably/realtime/client/incoming_message_dispatcher.rb in ably-0.6.2 vs lib/ably/realtime/client/incoming_message_dispatcher.rb in ably-0.7.0
- old
+ new
@@ -39,11 +39,11 @@
unless [:nack, :error].include?(protocol_message.action)
logger.debug "#{protocol_message.action} received: #{protocol_message}"
end
- update_connection_id protocol_message
+ update_connection_recovery_info protocol_message
case protocol_message.action
when ACTION.Heartbeat
when ACTION.Ack
ack_pending_queue_for_message_serial(protocol_message) if protocol_message.has_message_serial?
@@ -55,57 +55,75 @@
when ACTION.Connect
when ACTION.Connected
connection.transition_state_machine :connected
when ACTION.Disconnect, ACTION.Disconnected
+ connection.transition_state_machine :disconnected, protocol_message.error
when ACTION.Close
when ACTION.Closed
connection.transition_state_machine :closed
when ACTION.Error
- logger.error "Error received: #{protocol_message.error}"
if protocol_message.channel && !protocol_message.has_message_serial?
dispatch_channel_error protocol_message
else
- connection.transition_state_machine :failed, protocol_message.error
+ process_connection_error protocol_message
end
when ACTION.Attach
when ACTION.Attached
- get_channel(protocol_message.channel).change_state Ably::Realtime::Channel::STATE.Attached
+ get_channel(protocol_message.channel).transition_state_machine :attached, protocol_message
when ACTION.Detach
when ACTION.Detached
- get_channel(protocol_message.channel).change_state Ably::Realtime::Channel::STATE.Detached
+ get_channel(protocol_message.channel).transition_state_machine :detached
+ when ACTION.Sync
+ presence = get_channel(protocol_message.channel).presence
+ protocol_message.presence.each do |presence_message|
+ presence.__incoming_msgbus__.publish :sync, presence_message
+ end
+ presence.update_sync_serial protocol_message.channel_serial
+
when ACTION.Presence
- protocol_message.presence.each do |presence|
- get_channel(protocol_message.channel).presence.__incoming_msgbus__.publish :presence, presence
+ presence = get_channel(protocol_message.channel).presence
+ protocol_message.presence.each do |presence_message|
+ presence.__incoming_msgbus__.publish :presence, presence_message
end
when ACTION.Message
+ channel = get_channel(protocol_message.channel)
protocol_message.messages.each do |message|
- get_channel(protocol_message.channel).__incoming_msgbus__.publish :message, message
+ channel.__incoming_msgbus__.publish :message, message
end
else
raise ArgumentError, "Protocol Message Action #{protocol_message.action} is unsupported by this MessageDispatcher"
end
end
def dispatch_channel_error(protocol_message)
+ logger.warn "Channel Error message received: #{protocol_message.error}"
if !protocol_message.has_message_serial?
- get_channel(protocol_message.channel).change_state Ably::Realtime::Channel::STATE.Failed, protocol_message.error
+ get_channel(protocol_message.channel).transition_state_machine :failed, protocol_message.error
else
logger.fatal "Cannot process ProtocolMessage as not yet implemented: #{protocol_message}"
end
end
- def update_connection_id(protocol_message)
- if protocol_message.connection_id && (protocol_message.connection_id != connection.id)
- logger.debug "New connection ID set to #{protocol_message.connection_id}"
- connection.update_connection_id protocol_message.connection_id
+ def process_connection_error(protocol_message)
+ connection.manager.error_received_from_server protocol_message.error
+ end
+
+ def update_connection_recovery_info(protocol_message)
+ if protocol_message.connection_key && (protocol_message.connection_key != connection.key)
+ logger.debug "New connection ID set to #{protocol_message.connection_id} with connection key #{protocol_message.connection_key}"
+ connection.update_connection_id_and_key protocol_message.connection_id, protocol_message.connection_key
+ end
+
+ if protocol_message.has_connection_serial?
+ connection.update_connection_serial protocol_message.connection_serial
end
end
def ack_pending_queue_for_message_serial(ack_protocol_message)
drop_pending_queue_from_ack(ack_protocol_message) do |protocol_message|