lib/amqp/session.rb in amqp-1.3.0 vs lib/amqp/session.rb in amqp-1.4.0

- old
+ new

@@ -394,11 +394,23 @@ end # auto_recover # @endgroup + # @group Blocked connection notifications + def on_blocked(&fn) + @on_blocked = fn + end + + def on_unblocked(&fn) + @on_unblocked = fn + end + + # @endgroup + + # # Implementation # # Overrides TCP connection failure exception to one that inherits from AMQP::Error @@ -598,11 +610,11 @@ end # now we can set it. MK. @had_successfully_connected_before = true @reconnecting = false - @handling_skipped_hearbeats = false + @handling_skipped_heartbeats = false @last_server_heartbeat = Time.now self.handshake end @@ -685,13 +697,13 @@ # Called when time since last server heartbeat received is greater or equal to the # heartbeat interval set via :heartbeat_interval option on connection. # # @api plugin - def handle_skipped_hearbeats - if !@handling_skipped_hearbeats && @tcp_connection_established && !@intentionally_closing_connection - @handling_skipped_hearbeats = true + def handle_skipped_heartbeats + if !@handling_skipped_heartbeats && @tcp_connection_established && !@intentionally_closing_connection + @handling_skipped_heartbeats = true self.cancel_heartbeat_sender self.run_skipped_heartbeats_callbacks end end @@ -961,14 +973,14 @@ end # Sends a heartbeat frame if connection is open. # @api plugin def send_heartbeat - if tcp_connection_established? && !@handling_skipped_hearbeats && @last_server_heartbeat + if tcp_connection_established? && !@handling_skipped_heartbeats && @last_server_heartbeat if @last_server_heartbeat < (Time.now - (self.heartbeat_interval * 2)) && !reconnecting? logger.error "[amqp] Detected missing server heartbeats" - self.handle_skipped_hearbeats + self.handle_skipped_heartbeats end send_frame(AMQ::Protocol::HeartbeatFrame) end end # send_heartbeat @@ -1053,11 +1065,18 @@ closed! self.disconnection_successful end # handle_close_ok(close_ok) + def handle_connection_blocked(connection_blocked) + @on_blocked.call(self, connection_blocked) if @on_blocked + end + def handle_connection_unblocked(connection_unblocked) + @on_unblocked.call(self, connection_unblocked) if @on_unblocked + end + protected def negotiate_heartbeat_value(client_value, server_value) if client_value == 0 || server_value == 0 [client_value, server_value].max @@ -1123,9 +1142,16 @@ self.handle(AMQ::Protocol::Connection::CloseOk) do |connection, frame| connection.handle_close_ok(frame.decode_payload) end + self.handle(AMQ::Protocol::Connection::Blocked) do |connection, frame| + connection.handle_connection_blocked(frame.decode_payload) + end + + self.handle(AMQ::Protocol::Connection::Unblocked) do |connection, frame| + connection.handle_connection_unblocked(frame.decode_payload) + end protected