lib/amq/client/async/adapters/event_machine.rb in amq-client-0.8.1 vs lib/amq/client/async/adapters/event_machine.rb in amq-client-0.8.2

- old
+ new

@@ -171,12 +171,11 @@ self.reset self.set_pending_connect_timeout((@settings[:timeout] || 3).to_f) unless defined?(JRUBY_VERSION) if self.heartbeat_interval > 0 - @last_server_heartbeat = Time.now - EventMachine.add_periodic_timer(self.heartbeat_interval, &method(:send_heartbeat)) + self.initialize_heartbeat_sender end end # initialize(*args) @@ -263,12 +262,16 @@ self.upgrade_to_tls_if_necessary end # now we can set it. MK. @had_successfully_connected_before = true - @reconnecting = false + @reconnecting = false + @handling_skipped_hearbeats = false + @last_server_heartbeat = Time.now + self.initialize_heartbeat_sender if self.heartbeat_interval > 0 + self.handshake end # @private def close_connection(*args) @@ -285,10 +288,11 @@ # * Initial TCP connection fails # @private def unbind(exception = nil) if !@tcp_connection_established && !@had_successfully_connected_before && !@intentionally_closing_connection @tcp_connection_failed = true + logger.error "[amqp] Detected TCP connection failure" self.tcp_connection_failed end closing! @tcp_connection_established = false @@ -344,10 +348,27 @@ self.close_connection(true) self.reset closed! end # disconnection_successful + # Called when time since last server heartbeat received is greater or equal to the + # heartbeat interval set via :heartbeat_interval option on connection. + # + # @api plugin + def handle_skipped_hearbeats + if !@handling_skipped_hearbeats && @tcp_connection_established && !@intentionally_closing_connection + @handling_skipped_hearbeats = true + @heartbeats_timer.cancel + self.run_skipped_heartbeats_callbacks + end + end + + # @private + def initialize_heartbeat_sender + @last_server_heartbeat = Time.now + @heartbeats_timer = EventMachine::PeriodicTimer.new(self.heartbeat_interval, &method(:send_heartbeat)) + end self.handle(Protocol::Connection::Start) do |connection, frame| connection.handle_start(frame.decode_payload)