lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb in ably-rest-0.7.3 vs lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb in ably-rest-0.7.5
- old
+ new
@@ -11,17 +11,11 @@
def initialize(channel, connection)
@channel = channel
@connection = connection
- connection.on(:closed) do
- channel.transition_state_machine :detaching if can_transition_to?(:detaching)
- end
-
- connection.on(:failed) do |error|
- channel.transition_state_machine :failed, error if can_transition_to?(:failed)
- end
+ setup_connection_event_handlers
end
# Commence attachment
def attach
if can_transition_to?(:attached)
@@ -37,16 +31,16 @@
elsif can_transition_to?(:detached)
send_detach_protocol_message
end
end
- # Commence presence SYNC if applicable
- def sync(attached_protocol_message)
+ # Channel is attached, notify presence if sync is expected
+ def attached(attached_protocol_message)
if attached_protocol_message.has_presence_flag?
- channel.presence.sync_started
+ channel.presence.manager.sync_expected
else
- channel.presence.sync_completed
+ channel.presence.manager.sync_not_expected
end
end
# An error has occurred on the channel
def emit_error(error)
@@ -57,10 +51,49 @@
# Detach a channel as a result of an error
def suspend(error)
channel.transition_state_machine! :detaching, error
end
+ # When a channel is no longer attached or has failed,
+ # all messages awaiting an ACK response should fail immediately
+ def fail_messages_awaiting_ack(error)
+ # Allow a short time for other queued operations to complete before failing all messages
+ EventMachine.add_timer(0.1) do
+ error = Ably::Exceptions::MessageDeliveryError.new('Channel is no longer in a state suitable to deliver this message to the server') unless error
+ fail_messages_in_queue connection.__pending_message_ack_queue__, error
+ fail_messages_in_queue connection.__outgoing_message_queue__, error
+ end
+ end
+
+ def fail_messages_in_queue(queue, error)
+ queue.delete_if do |protocol_message|
+ if protocol_message.channel == channel.name
+ nack_messages protocol_message, error
+ true
+ end
+ end
+ end
+
+ def nack_messages(protocol_message, error)
+ (protocol_message.messages + protocol_message.presence).each do |message|
+ logger.debug "Calling NACK failure callbacks for #{message.class.name} - #{message.to_json}, protocol message: #{protocol_message}"
+ message.fail message, error
+ end
+ logger.debug "Calling NACK failure callbacks for #{protocol_message.class.name} - #{protocol_message.to_json}"
+ protocol_message.fail protocol_message, error
+ end
+
+ def drop_pending_queue_from_ack(ack_protocol_message)
+ message_serial_up_to = ack_protocol_message.message_serial + ack_protocol_message.count - 1
+ connection.__pending_message_ack_queue__.drop_while do |protocol_message|
+ if protocol_message.message_serial <= message_serial_up_to
+ yield protocol_message
+ true
+ end
+ end
+ end
+
private
attr_reader :channel, :connection
def_delegators :channel, :can_transition_to?
@@ -80,9 +113,19 @@
def send_state_change_protocol_message(state)
connection.send_protocol_message(
action: state.to_i,
channel: channel.name
)
+ end
+
+ def setup_connection_event_handlers
+ connection.unsafe_on(:closed) do
+ channel.transition_state_machine :detaching if can_transition_to?(:detaching)
+ end
+
+ connection.unsafe_on(:failed) do |error|
+ channel.transition_state_machine :failed, error if can_transition_to?(:failed)
+ end
end
def logger
connection.logger
end