lib/ably/realtime/client/incoming_message_dispatcher.rb in ably-0.8.4 vs lib/ably/realtime/client/incoming_message_dispatcher.rb in ably-0.8.5
- old
+ new
@@ -66,11 +66,17 @@
logger.warn "NACK received: #{protocol_message}"
nack_pending_queue_for_message_serial(protocol_message) if protocol_message.has_message_serial?
when ACTION.Connect
when ACTION.Connected
- connection.transition_state_machine :connected, reason: protocol_message.error, protocol_message: protocol_message unless connection.connected?
+ if connection.disconnected? || connection.closing? || connection.closed? || connection.failed?
+ logger.debug "Incoming CONNECTED ProtocolMessage discarded as connection has moved on and is in state: #{connection.state}"
+ elsif connection.connected?
+ logger.error "CONNECTED ProtocolMessage should not have been received when the connection is in the CONNECTED state"
+ else
+ connection.transition_state_machine :connected, reason: protocol_message.error, protocol_message: protocol_message
+ end
when ACTION.Disconnect, ACTION.Disconnected
connection.transition_state_machine :disconnected, reason: protocol_message.error unless connection.disconnected?
when ACTION.Close
@@ -161,20 +167,20 @@
end
def nack_messages(messages, protocol_message)
messages.each do |message|
logger.debug "Calling NACK failure callbacks for #{message.class.name} - #{message.to_json}, protocol message: #{protocol_message}"
- message.fail message, protocol_message.error
+ message.fail protocol_message.error
end
end
def drop_pending_queue_from_ack(ack_protocol_message)
message_serial_up_to = ack_protocol_message.message_serial + ack_protocol_message.count - 1
- connection.__pending_message_ack_queue__.drop_while do |protocol_message|
- if protocol_message.message_serial <= message_serial_up_to
- yield protocol_message
- true
- end
+
+ while !connection.__pending_message_ack_queue__.empty?
+ next_message = connection.__pending_message_ack_queue__.first
+ return if next_message.message_serial > message_serial_up_to
+ yield connection.__pending_message_ack_queue__.shift
end
end
def subscribe_to_incoming_protocol_messages
connection.__incoming_protocol_msgbus__.subscribe(:protocol_message) do |*args|