lib/ably/realtime/client/incoming_message_dispatcher.rb in ably-0.6.2 vs lib/ably/realtime/client/incoming_message_dispatcher.rb in ably-0.7.0

- old
+ new

@@ -39,11 +39,11 @@ unless [:nack, :error].include?(protocol_message.action) logger.debug "#{protocol_message.action} received: #{protocol_message}" end - update_connection_id protocol_message + update_connection_recovery_info protocol_message case protocol_message.action when ACTION.Heartbeat when ACTION.Ack ack_pending_queue_for_message_serial(protocol_message) if protocol_message.has_message_serial? @@ -55,57 +55,75 @@ when ACTION.Connect when ACTION.Connected connection.transition_state_machine :connected when ACTION.Disconnect, ACTION.Disconnected + connection.transition_state_machine :disconnected, protocol_message.error when ACTION.Close when ACTION.Closed connection.transition_state_machine :closed when ACTION.Error - logger.error "Error received: #{protocol_message.error}" if protocol_message.channel && !protocol_message.has_message_serial? dispatch_channel_error protocol_message else - connection.transition_state_machine :failed, protocol_message.error + process_connection_error protocol_message end when ACTION.Attach when ACTION.Attached - get_channel(protocol_message.channel).change_state Ably::Realtime::Channel::STATE.Attached + get_channel(protocol_message.channel).transition_state_machine :attached, protocol_message when ACTION.Detach when ACTION.Detached - get_channel(protocol_message.channel).change_state Ably::Realtime::Channel::STATE.Detached + get_channel(protocol_message.channel).transition_state_machine :detached + when ACTION.Sync + presence = get_channel(protocol_message.channel).presence + protocol_message.presence.each do |presence_message| + presence.__incoming_msgbus__.publish :sync, presence_message + end + presence.update_sync_serial protocol_message.channel_serial + when ACTION.Presence - protocol_message.presence.each do |presence| - get_channel(protocol_message.channel).presence.__incoming_msgbus__.publish :presence, presence + presence = get_channel(protocol_message.channel).presence + protocol_message.presence.each do |presence_message| + presence.__incoming_msgbus__.publish :presence, presence_message end when ACTION.Message + channel = get_channel(protocol_message.channel) protocol_message.messages.each do |message| - get_channel(protocol_message.channel).__incoming_msgbus__.publish :message, message + channel.__incoming_msgbus__.publish :message, message end else raise ArgumentError, "Protocol Message Action #{protocol_message.action} is unsupported by this MessageDispatcher" end end def dispatch_channel_error(protocol_message) + logger.warn "Channel Error message received: #{protocol_message.error}" if !protocol_message.has_message_serial? - get_channel(protocol_message.channel).change_state Ably::Realtime::Channel::STATE.Failed, protocol_message.error + get_channel(protocol_message.channel).transition_state_machine :failed, protocol_message.error else logger.fatal "Cannot process ProtocolMessage as not yet implemented: #{protocol_message}" end end - def update_connection_id(protocol_message) - if protocol_message.connection_id && (protocol_message.connection_id != connection.id) - logger.debug "New connection ID set to #{protocol_message.connection_id}" - connection.update_connection_id protocol_message.connection_id + def process_connection_error(protocol_message) + connection.manager.error_received_from_server protocol_message.error + end + + def update_connection_recovery_info(protocol_message) + if protocol_message.connection_key && (protocol_message.connection_key != connection.key) + logger.debug "New connection ID set to #{protocol_message.connection_id} with connection key #{protocol_message.connection_key}" + connection.update_connection_id_and_key protocol_message.connection_id, protocol_message.connection_key + end + + if protocol_message.has_connection_serial? + connection.update_connection_serial protocol_message.connection_serial end end def ack_pending_queue_for_message_serial(ack_protocol_message) drop_pending_queue_from_ack(ack_protocol_message) do |protocol_message|