lib/ably/realtime/client/incoming_message_dispatcher.rb in ably-0.1.5 vs lib/ably/realtime/client/incoming_message_dispatcher.rb in ably-0.1.6

- old
+ new

@@ -1,99 +1,128 @@ module Ably::Realtime class Client - # IncomingMessageDispatcher is a (private) class that is used to dispatch {Ably::Realtime::Models::ProtocolMessage} that are + # IncomingMessageDispatcher is a (private) class that is used to dispatch {Ably::Models::ProtocolMessage} that are # received from Ably via the {Ably::Realtime::Connection} class IncomingMessageDispatcher - ACTION = Models::ProtocolMessage::ACTION + ACTION = Ably::Models::ProtocolMessage::ACTION - def initialize(client) - @client = client + def initialize(client, connection) + @client = client + @connection = connection subscribe_to_incoming_protocol_messages end private - attr_reader :client + attr_reader :client, :connection - def connection - client.connection - end - def channels client.channels end def get_channel(channel_name) channels.fetch(channel_name) do logger.warn "Received channel message for non-existent channel" - Models::NilChannel.new + Ably::Models::NilChannel.new end end def logger client.logger end def dispatch_protocol_message(*args) protocol_message = args.first - unless protocol_message.kind_of?(Models::ProtocolMessage) + unless protocol_message.kind_of?(Ably::Models::ProtocolMessage) raise ArgumentError, "Expected a ProtocolMessage. Received #{protocol_message}" end unless [:nack, :error].include?(protocol_message.action) logger.debug "#{protocol_message.action} received: #{protocol_message}" end + update_connection_id protocol_message + case protocol_message.action when ACTION.Heartbeat when ACTION.Ack ack_pending_queue_for_message_serial(protocol_message) if protocol_message.has_message_serial? when ACTION.Nack logger.warn "NACK received: #{protocol_message}" nack_pending_queue_for_message_serial(protocol_message) if protocol_message.has_message_serial? - when ACTION.Connect, ACTION.Connected + when ACTION.Connect + when ACTION.Connected + connection.transition_state_machine :connected + when ACTION.Disconnect, ACTION.Disconnected + when ACTION.Close when ACTION.Closed + connection.transition_state_machine :closed + when ACTION.Error logger.error "Error received: #{protocol_message.error}" + if protocol_message.channel && !protocol_message.has_message_serial? + get_channel(protocol_message.channel).change_state Ably::Realtime::Channel::STATE.Failed, protocol_message.error + end when ACTION.Attach when ACTION.Attached get_channel(protocol_message.channel).change_state Ably::Realtime::Channel::STATE.Attached when ACTION.Detach when ACTION.Detached get_channel(protocol_message.channel).change_state Ably::Realtime::Channel::STATE.Detached when ACTION.Presence + protocol_message.presence.each do |presence| + get_channel(protocol_message.channel).presence.__incoming_msgbus__.publish :presence, presence + end + when ACTION.Message protocol_message.messages.each do |message| - get_channel(protocol_message.channel).__incoming_protocol_msgbus__.publish :message, message + get_channel(protocol_message.channel).__incoming_msgbus__.publish :message, message end else raise ArgumentError, "Protocol Message Action #{protocol_message.action} is unsupported by this MessageDispatcher" end end + def update_connection_id(protocol_message) + if protocol_message.connection_id && (protocol_message.connection_id != connection.id) + logger.debug "New connection ID set to #{protocol_message.connection_id}" + connection.update_connection_id protocol_message.connection_id + end + end + def ack_pending_queue_for_message_serial(ack_protocol_message) drop_pending_queue_from_ack(ack_protocol_message) do |protocol_message| - protocol_message.messages.each do |message| - logger.debug "Calling ACK success callbacks for #{message.to_json}" - message.succeed message - end + ack_messages protocol_message.messages + ack_messages protocol_message.presence end end def nack_pending_queue_for_message_serial(nack_protocol_message) drop_pending_queue_from_ack(nack_protocol_message) do |protocol_message| - protocol_message.messages.each do |message| - logger.debug "Calling NACK failure callbacks for #{message.to_json}" - message.fail message, nack_protocol_message.error - end + nack_messages protocol_message.messages, nack_protocol_message + nack_messages protocol_message.presence, nack_protocol_message + end + end + + def ack_messages(messages) + messages.each do |message| + logger.debug "Calling ACK success callbacks for #{message.class.name} - #{message.to_json}" + message.succeed message + end + end + + def nack_messages(messages, protocol_message) + messages.each do |message| + logger.debug "Calling NACK failure callbacks for #{message.class.name} - #{message.to_json}, protocol message: #{protocol_message}" + message.fail message, protocol_message.error end end def drop_pending_queue_from_ack(ack_protocol_message) message_serial_up_to = ack_protocol_message.message_serial + ack_protocol_message.count - 1