lib/amq/client/async/adapters/event_machine.rb in amq-client-0.8.1 vs lib/amq/client/async/adapters/event_machine.rb in amq-client-0.8.2
- old
+ new
@@ -171,12 +171,11 @@
self.reset
self.set_pending_connect_timeout((@settings[:timeout] || 3).to_f) unless defined?(JRUBY_VERSION)
if self.heartbeat_interval > 0
- @last_server_heartbeat = Time.now
- EventMachine.add_periodic_timer(self.heartbeat_interval, &method(:send_heartbeat))
+ self.initialize_heartbeat_sender
end
end # initialize(*args)
@@ -263,12 +262,16 @@
self.upgrade_to_tls_if_necessary
end
# now we can set it. MK.
@had_successfully_connected_before = true
- @reconnecting = false
+ @reconnecting = false
+ @handling_skipped_hearbeats = false
+ @last_server_heartbeat = Time.now
+ self.initialize_heartbeat_sender if self.heartbeat_interval > 0
+
self.handshake
end
# @private
def close_connection(*args)
@@ -285,10 +288,11 @@
# * Initial TCP connection fails
# @private
def unbind(exception = nil)
if !@tcp_connection_established && !@had_successfully_connected_before && !@intentionally_closing_connection
@tcp_connection_failed = true
+ logger.error "[amqp] Detected TCP connection failure"
self.tcp_connection_failed
end
closing!
@tcp_connection_established = false
@@ -344,10 +348,27 @@
self.close_connection(true)
self.reset
closed!
end # disconnection_successful
+ # Called when time since last server heartbeat received is greater or equal to the
+ # heartbeat interval set via :heartbeat_interval option on connection.
+ #
+ # @api plugin
+ def handle_skipped_hearbeats
+ if !@handling_skipped_hearbeats && @tcp_connection_established && !@intentionally_closing_connection
+ @handling_skipped_hearbeats = true
+ @heartbeats_timer.cancel
+ self.run_skipped_heartbeats_callbacks
+ end
+ end
+
+ # @private
+ def initialize_heartbeat_sender
+ @last_server_heartbeat = Time.now
+ @heartbeats_timer = EventMachine::PeriodicTimer.new(self.heartbeat_interval, &method(:send_heartbeat))
+ end
self.handle(Protocol::Connection::Start) do |connection, frame|
connection.handle_start(frame.decode_payload)