lib/right_amqp/amqp/client.rb in right_amqp-0.7.0 vs lib/right_amqp/amqp/client.rb in right_amqp-0.8.3

- old
+ new

@@ -54,11 +54,11 @@ # Make callback now that handshake with the broker has completed # The 'connected' status callback happens before the handshake is done and if it results in # a lot of activity it might prevent EM from being able to call the code handling the # incoming handshake packet in a timely fashion causing the broker to close the connection - @connection_status.call(:ready) if @connection_status && frame.payload.is_a?(AMQP::Protocol::Connection::Start) + @connection_status.call(:ready) if @connection_status && frame.payload.is_a?(AMQP::Protocol::Connection::OpenOk) end end def self.client @client ||= BasicClient @@ -97,10 +97,11 @@ @on_disconnect = method(:disconnected) @reconnecting = false end @connected = true + logger.debug("[amqp] Connected to broker #{@settings[:identity]}") @connection_status.call(:connected) if @connection_status @buf = Buffer.new send_data HEADER send_data [1, 1, VERSION_MAJOR, VERSION_MINOR].pack('C4') @@ -128,12 +129,17 @@ @heartbeat_timer.cancel if @heartbeat_timer @heartbeat_timer = EM::PeriodicTimer.new(@settings[:heartbeat]) do begin if connected? now = Time.now - if @last_data_received < (now - (@settings[:heartbeat] * timeout_factor)) - data_received = (now - @last_data_received).to_i if @last_data_received + if @last_data_received.nil? + # This is something of an anomaly; not clear how this condition is reached + logger.info("[amqp] Reconnecting to broker #{@settings[:identity]} due to heartbeat timeout " + + "with no data having been received") + close_connection # which results in an unbind and an automatic reconnect + elsif @last_data_received < (now - (@settings[:heartbeat] * timeout_factor)) + data_received = (now - @last_data_received).to_i heartbeat_received = (now - @last_heartbeat_received).to_i if @last_heartbeat_received heartbeat_sent = (now - @last_heartbeat_sent).to_i if @last_heartbeat_sent logger.info("[amqp] Reconnecting to broker #{@settings[:identity]} due to heartbeat timeout: " + "last data received #{data_received.inspect} sec ago, " + "last heartbeat received #{heartbeat_received.inspect} sec ago, " + @@ -162,10 +168,11 @@ end def unbind log 'disconnected' @connected = false + logger.debug("[amqp] Disconnected from broker #{@settings[:identity]}") EM.next_tick{ @on_disconnect.call } end def add_channel mq (@_channel_mutex ||= Mutex.new).synchronize do @@ -271,10 +278,10 @@ raise ::AMQP::Error, "Could not interpret :reconnect_delay => #{again.inspect}; expected nil, true, or Numeric" end end log 'reconnecting' - logger.info("[amqp] Attempting to reconnect to #{@settings[:identity]}") + logger.info("[amqp] Attempting to reconnect to broker #{@settings[:identity]}") EM.reconnect(@settings[:host], @settings[:port], self) rescue Exception => e logger.exception("[amqp] Failed to reconnect", e, :trace) failed end