lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb in ably-rest-0.7.3 vs lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb in ably-rest-0.7.5

- old
+ new

@@ -11,17 +11,11 @@ def initialize(channel, connection) @channel = channel @connection = connection - connection.on(:closed) do - channel.transition_state_machine :detaching if can_transition_to?(:detaching) - end - - connection.on(:failed) do |error| - channel.transition_state_machine :failed, error if can_transition_to?(:failed) - end + setup_connection_event_handlers end # Commence attachment def attach if can_transition_to?(:attached) @@ -37,16 +31,16 @@ elsif can_transition_to?(:detached) send_detach_protocol_message end end - # Commence presence SYNC if applicable - def sync(attached_protocol_message) + # Channel is attached, notify presence if sync is expected + def attached(attached_protocol_message) if attached_protocol_message.has_presence_flag? - channel.presence.sync_started + channel.presence.manager.sync_expected else - channel.presence.sync_completed + channel.presence.manager.sync_not_expected end end # An error has occurred on the channel def emit_error(error) @@ -57,10 +51,49 @@ # Detach a channel as a result of an error def suspend(error) channel.transition_state_machine! :detaching, error end + # When a channel is no longer attached or has failed, + # all messages awaiting an ACK response should fail immediately + def fail_messages_awaiting_ack(error) + # Allow a short time for other queued operations to complete before failing all messages + EventMachine.add_timer(0.1) do + error = Ably::Exceptions::MessageDeliveryError.new('Channel is no longer in a state suitable to deliver this message to the server') unless error + fail_messages_in_queue connection.__pending_message_ack_queue__, error + fail_messages_in_queue connection.__outgoing_message_queue__, error + end + end + + def fail_messages_in_queue(queue, error) + queue.delete_if do |protocol_message| + if protocol_message.channel == channel.name + nack_messages protocol_message, error + true + end + end + end + + def nack_messages(protocol_message, error) + (protocol_message.messages + protocol_message.presence).each do |message| + logger.debug "Calling NACK failure callbacks for #{message.class.name} - #{message.to_json}, protocol message: #{protocol_message}" + message.fail message, error + end + logger.debug "Calling NACK failure callbacks for #{protocol_message.class.name} - #{protocol_message.to_json}" + protocol_message.fail protocol_message, error + end + + def drop_pending_queue_from_ack(ack_protocol_message) + message_serial_up_to = ack_protocol_message.message_serial + ack_protocol_message.count - 1 + connection.__pending_message_ack_queue__.drop_while do |protocol_message| + if protocol_message.message_serial <= message_serial_up_to + yield protocol_message + true + end + end + end + private attr_reader :channel, :connection def_delegators :channel, :can_transition_to? @@ -80,9 +113,19 @@ def send_state_change_protocol_message(state) connection.send_protocol_message( action: state.to_i, channel: channel.name ) + end + + def setup_connection_event_handlers + connection.unsafe_on(:closed) do + channel.transition_state_machine :detaching if can_transition_to?(:detaching) + end + + connection.unsafe_on(:failed) do |error| + channel.transition_state_machine :failed, error if can_transition_to?(:failed) + end end def logger connection.logger end