lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb in ably-rest-0.8.3 vs lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb in ably-rest-0.8.5
- old
+ new
@@ -25,11 +25,11 @@
end
# Commence attachment
def detach(error = nil)
if connection.closed? || connection.connecting? || connection.suspended?
- channel.transition_state_machine :detached, error
+ channel.transition_state_machine :detached, reason: error
elsif can_transition_to?(:detached)
send_detach_protocol_message
end
end
@@ -49,11 +49,11 @@
channel.emit :error, error
end
# Detach a channel as a result of an error
def suspend(error)
- channel.transition_state_machine! :detaching, error
+ channel.transition_state_machine! :detaching, reason: error
end
# When a channel is no longer attached or has failed,
# all messages awaiting an ACK response should fail immediately
def fail_messages_awaiting_ack(error)
@@ -75,14 +75,14 @@
end
def nack_messages(protocol_message, error)
(protocol_message.messages + protocol_message.presence).each do |message|
logger.debug "Calling NACK failure callbacks for #{message.class.name} - #{message.to_json}, protocol message: #{protocol_message}"
- message.fail message, error
+ message.fail error
end
logger.debug "Calling NACK failure callbacks for #{protocol_message.class.name} - #{protocol_message.to_json}"
- protocol_message.fail protocol_message, error
+ protocol_message.fail error
end
def drop_pending_queue_from_ack(ack_protocol_message)
message_serial_up_to = ack_protocol_message.message_serial + ack_protocol_message.count - 1
connection.__pending_message_ack_queue__.drop_while do |protocol_message|
@@ -116,24 +116,44 @@
action: state.to_i,
channel: channel.name
)
end
+ # Any message sent before an ACK/NACK was received on the previous transport
+ # needs to be resent to the Ably service so that a subsequent ACK/NACK is received.
+ # It is up to Ably to ensure that duplicate messages are not retransmitted on the channel
+ # base on the serial numbers
+ #
+ # @api private
+ def resend_pending_message_ack_queue
+ connection.__pending_message_ack_queue__.delete_if do |protocol_message|
+ if protocol_message.channel == channel.name
+ connection.__outgoing_message_queue__ << protocol_message
+ connection.__outgoing_protocol_msgbus__.publish :protocol_message
+ true
+ end
+ end
+ end
+
def setup_connection_event_handlers
connection.unsafe_on(:closed) do
channel.transition_state_machine :detaching if can_transition_to?(:detaching)
end
connection.unsafe_on(:suspended) do |error|
if can_transition_to?(:detaching)
- channel.transition_state_machine :detaching, Ably::Exceptions::ConnectionSuspended.new('Connection suspended', nil, 80002, error)
+ channel.transition_state_machine :detaching, reason: Ably::Exceptions::ConnectionSuspended.new('Connection suspended', nil, 80002, error)
end
end
connection.unsafe_on(:failed) do |error|
- if can_transition_to?(:failed)
- channel.transition_state_machine :failed, Ably::Exceptions::ConnectionFailed.new('Connection failed', nil, 80002, error)
+ if can_transition_to?(:failed) && !channel.detached?
+ channel.transition_state_machine :failed, reason: Ably::Exceptions::ConnectionFailed.new('Connection failed', nil, 80002, error)
end
+ end
+
+ connection.unsafe_on(:connected) do |error|
+ resend_pending_message_ack_queue
end
end
def logger
connection.logger