lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb in ably-rest-0.8.3 vs lib/submodules/ably-ruby/lib/ably/realtime/channel/channel_manager.rb in ably-rest-0.8.5

- old
+ new

@@ -25,11 +25,11 @@ end # Commence attachment def detach(error = nil) if connection.closed? || connection.connecting? || connection.suspended? - channel.transition_state_machine :detached, error + channel.transition_state_machine :detached, reason: error elsif can_transition_to?(:detached) send_detach_protocol_message end end @@ -49,11 +49,11 @@ channel.emit :error, error end # Detach a channel as a result of an error def suspend(error) - channel.transition_state_machine! :detaching, error + channel.transition_state_machine! :detaching, reason: error end # When a channel is no longer attached or has failed, # all messages awaiting an ACK response should fail immediately def fail_messages_awaiting_ack(error) @@ -75,14 +75,14 @@ end def nack_messages(protocol_message, error) (protocol_message.messages + protocol_message.presence).each do |message| logger.debug "Calling NACK failure callbacks for #{message.class.name} - #{message.to_json}, protocol message: #{protocol_message}" - message.fail message, error + message.fail error end logger.debug "Calling NACK failure callbacks for #{protocol_message.class.name} - #{protocol_message.to_json}" - protocol_message.fail protocol_message, error + protocol_message.fail error end def drop_pending_queue_from_ack(ack_protocol_message) message_serial_up_to = ack_protocol_message.message_serial + ack_protocol_message.count - 1 connection.__pending_message_ack_queue__.drop_while do |protocol_message| @@ -116,24 +116,44 @@ action: state.to_i, channel: channel.name ) end + # Any message sent before an ACK/NACK was received on the previous transport + # needs to be resent to the Ably service so that a subsequent ACK/NACK is received. + # It is up to Ably to ensure that duplicate messages are not retransmitted on the channel + # base on the serial numbers + # + # @api private + def resend_pending_message_ack_queue + connection.__pending_message_ack_queue__.delete_if do |protocol_message| + if protocol_message.channel == channel.name + connection.__outgoing_message_queue__ << protocol_message + connection.__outgoing_protocol_msgbus__.publish :protocol_message + true + end + end + end + def setup_connection_event_handlers connection.unsafe_on(:closed) do channel.transition_state_machine :detaching if can_transition_to?(:detaching) end connection.unsafe_on(:suspended) do |error| if can_transition_to?(:detaching) - channel.transition_state_machine :detaching, Ably::Exceptions::ConnectionSuspended.new('Connection suspended', nil, 80002, error) + channel.transition_state_machine :detaching, reason: Ably::Exceptions::ConnectionSuspended.new('Connection suspended', nil, 80002, error) end end connection.unsafe_on(:failed) do |error| - if can_transition_to?(:failed) - channel.transition_state_machine :failed, Ably::Exceptions::ConnectionFailed.new('Connection failed', nil, 80002, error) + if can_transition_to?(:failed) && !channel.detached? + channel.transition_state_machine :failed, reason: Ably::Exceptions::ConnectionFailed.new('Connection failed', nil, 80002, error) end + end + + connection.unsafe_on(:connected) do |error| + resend_pending_message_ack_queue end end def logger connection.logger