lib/ably/realtime/client/incoming_message_dispatcher.rb in ably-0.8.4 vs lib/ably/realtime/client/incoming_message_dispatcher.rb in ably-0.8.5

- old
+ new

@@ -66,11 +66,17 @@ logger.warn "NACK received: #{protocol_message}" nack_pending_queue_for_message_serial(protocol_message) if protocol_message.has_message_serial? when ACTION.Connect when ACTION.Connected - connection.transition_state_machine :connected, reason: protocol_message.error, protocol_message: protocol_message unless connection.connected? + if connection.disconnected? || connection.closing? || connection.closed? || connection.failed? + logger.debug "Incoming CONNECTED ProtocolMessage discarded as connection has moved on and is in state: #{connection.state}" + elsif connection.connected? + logger.error "CONNECTED ProtocolMessage should not have been received when the connection is in the CONNECTED state" + else + connection.transition_state_machine :connected, reason: protocol_message.error, protocol_message: protocol_message + end when ACTION.Disconnect, ACTION.Disconnected connection.transition_state_machine :disconnected, reason: protocol_message.error unless connection.disconnected? when ACTION.Close @@ -161,20 +167,20 @@ end def nack_messages(messages, protocol_message) messages.each do |message| logger.debug "Calling NACK failure callbacks for #{message.class.name} - #{message.to_json}, protocol message: #{protocol_message}" - message.fail message, protocol_message.error + message.fail protocol_message.error end end def drop_pending_queue_from_ack(ack_protocol_message) message_serial_up_to = ack_protocol_message.message_serial + ack_protocol_message.count - 1 - connection.__pending_message_ack_queue__.drop_while do |protocol_message| - if protocol_message.message_serial <= message_serial_up_to - yield protocol_message - true - end + + while !connection.__pending_message_ack_queue__.empty? + next_message = connection.__pending_message_ack_queue__.first + return if next_message.message_serial > message_serial_up_to + yield connection.__pending_message_ack_queue__.shift end end def subscribe_to_incoming_protocol_messages connection.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |*args|