lib/amqp/session.rb in amqp-1.3.0 vs lib/amqp/session.rb in amqp-1.4.0
- old
+ new
@@ -394,11 +394,23 @@
end # auto_recover
# @endgroup
+ # @group Blocked connection notifications
+ def on_blocked(&fn)
+ @on_blocked = fn
+ end
+
+ def on_unblocked(&fn)
+ @on_unblocked = fn
+ end
+
+ # @endgroup
+
+
#
# Implementation
#
# Overrides TCP connection failure exception to one that inherits from AMQP::Error
@@ -598,11 +610,11 @@
end
# now we can set it. MK.
@had_successfully_connected_before = true
@reconnecting = false
- @handling_skipped_hearbeats = false
+ @handling_skipped_heartbeats = false
@last_server_heartbeat = Time.now
self.handshake
end
@@ -685,13 +697,13 @@
# Called when time since last server heartbeat received is greater or equal to the
# heartbeat interval set via :heartbeat_interval option on connection.
#
# @api plugin
- def handle_skipped_hearbeats
- if !@handling_skipped_hearbeats && @tcp_connection_established && !@intentionally_closing_connection
- @handling_skipped_hearbeats = true
+ def handle_skipped_heartbeats
+ if !@handling_skipped_heartbeats && @tcp_connection_established && !@intentionally_closing_connection
+ @handling_skipped_heartbeats = true
self.cancel_heartbeat_sender
self.run_skipped_heartbeats_callbacks
end
end
@@ -961,14 +973,14 @@
end
# Sends a heartbeat frame if connection is open.
# @api plugin
def send_heartbeat
- if tcp_connection_established? && !@handling_skipped_hearbeats && @last_server_heartbeat
+ if tcp_connection_established? && !@handling_skipped_heartbeats && @last_server_heartbeat
if @last_server_heartbeat < (Time.now - (self.heartbeat_interval * 2)) && !reconnecting?
logger.error "[amqp] Detected missing server heartbeats"
- self.handle_skipped_hearbeats
+ self.handle_skipped_heartbeats
end
send_frame(AMQ::Protocol::HeartbeatFrame)
end
end # send_heartbeat
@@ -1053,11 +1065,18 @@
closed!
self.disconnection_successful
end # handle_close_ok(close_ok)
+ def handle_connection_blocked(connection_blocked)
+ @on_blocked.call(self, connection_blocked) if @on_blocked
+ end
+ def handle_connection_unblocked(connection_unblocked)
+ @on_unblocked.call(self, connection_unblocked) if @on_unblocked
+ end
+
protected
def negotiate_heartbeat_value(client_value, server_value)
if client_value == 0 || server_value == 0
[client_value, server_value].max
@@ -1123,9 +1142,16 @@
self.handle(AMQ::Protocol::Connection::CloseOk) do |connection, frame|
connection.handle_close_ok(frame.decode_payload)
end
+ self.handle(AMQ::Protocol::Connection::Blocked) do |connection, frame|
+ connection.handle_connection_blocked(frame.decode_payload)
+ end
+
+ self.handle(AMQ::Protocol::Connection::Unblocked) do |connection, frame|
+ connection.handle_connection_unblocked(frame.decode_payload)
+ end
protected