lib/right_amqp/amqp/client.rb in right_amqp-0.3.1 vs lib/right_amqp/amqp/client.rb in right_amqp-0.3.2

- old
+ new

@@ -4,10 +4,11 @@ module AMQP class Error < StandardError; end module BasicClient def process_frame frame + @last_data_received = Time.now if mq = channels[frame.channel] mq.process_frame(frame) return end @@ -46,12 +47,11 @@ @on_disconnect.call if @on_disconnect end when Frame::Heartbeat logger.debug("[amqp] Received heartbeat from broker #{@settings[:identity]}") - @last_server_heartbeat = Time.now - + @last_heartbeat_received = @last_data_received end # Make callback now that handshake with the broker has completed # The 'connected' status callback happens before the handshake is done and if it results in # a lot of activity it might prevent EM from being able to call the code handling the @@ -109,25 +109,52 @@ init_heartbeat if (@settings[:heartbeat] = heartbeat.to_i) > 0 end end def init_heartbeat - logger.debug("[amqp] Initializing heartbeat for broker #{@settings[:identity]} to #{@settings[:heartbeat]}") - @last_server_heartbeat = Time.now + # Randomly offset start of heartbeat timer to help separate heartbeat + # activity when there are multiple broker connections active + EM.add_timer(rand(@settings[:heartbeat])) do + begin + # While connected, a heartbeat or some other data is expected to be received from + # the broker at least every 2 x :heartbeat seconds, otherwise the connection is + # assumed to be broken and therefore is closed to cause an automatic reconnect. + # While connected, this client will send a heartbeat every :heartbeat + # seconds regardless of any other send activity. The RabbitMQ broker will behave + # similarly and drop the connection if it does not receive a heartbeat or other + # data in time. + logger.info("[amqp] Initializing heartbeat for broker #{@settings[:identity]} to #{@settings[:heartbeat]} sec") - @timer.cancel if @timer - @timer = EM::PeriodicTimer.new(@settings[:heartbeat]) do - if connected? - if @last_server_heartbeat < (Time.now - (@settings[:heartbeat] * 2)) - log "Reconnecting due to missing server heartbeats" - logger.info("[amqp] Reconnecting to broker #{@settings[:identity]} due to missing server heartbeats") - reconnect(true) - else - logger.debug("[amqp] Sending heartbeat to broker #{@settings[:identity]}") - @last_server_heartbeat = Time.now - send AMQP::Frame::Heartbeat.new, :channel => 0 + timeout_factor = 2 + @heartbeat_timer.cancel if @heartbeat_timer + @heartbeat_timer = EM::PeriodicTimer.new(@settings[:heartbeat]) do + begin + if connected? + now = Time.now + if @last_data_received < (now - (@settings[:heartbeat] * timeout_factor)) + data_received = (now - @last_data_received).to_i if @last_data_received + heartbeat_received = (now - @last_heartbeat_received).to_i if @last_heartbeat_received + heartbeat_sent = (now - @last_heartbeat_sent).to_i if @last_heartbeat_sent + logger.info("[amqp] Reconnecting to broker #{@settings[:identity]} due to heartbeat timeout: " + + "last data received #{data_received.inspect} sec ago, " + + "last heartbeat received #{heartbeat_received.inspect} sec ago, " + + "last heartbeat sent #{heartbeat_sent.inspect} sec ago") + close_connection # which results in an unbind and an automatic reconnect + else + logger.debug("[amqp] Sending heartbeat to broker #{@settings[:identity]}") + send AMQP::Frame::Heartbeat.new, :channel => 0 + @last_heartbeat_sent = Time.now + end + else + logger.debug("[amqp] Skipping heartbeat check for broker #{@settings[:identity]} because disconnected") + end + rescue Exception => e + logger.error("[amqp] Failed heartbeat check (#{e})\n" + e.backtrace.join("\n")) + end end + rescue Exception => e + logger.error("[amqp] Failed heartbeat initialization (#{e})\n" + e.backtrace.join("\n")) end end end def connected? @@ -191,10 +218,12 @@ # super # end #:startdoc: def close &on_disconnect + @heartbeat_timer.cancel if @heartbeat_timer + @heartbeat_timer = nil if on_disconnect @closing = true @on_disconnect = proc{ on_disconnect.call @closing = false @@ -216,11 +245,17 @@ end def reconnect force = false if @reconnecting and not force # Wait after first reconnect attempt and in between each subsequent attempt - EM.add_timer(@settings[:reconnect_interval] || 5) { reconnect(true) } + EM.add_timer(@settings[:reconnect_interval] || 5) do + begin + reconnect(true) + rescue Exception => e + logger.exception("[amqp] Failed to reconnect", e, :trace) + end + end return end unless @reconnecting @deferred_status = nil @@ -234,10 +269,16 @@ again = @settings[:reconnect_delay] again = again.call if again.is_a?(Proc) if again.is_a?(Numeric) # Wait before making initial reconnect attempt - EM.add_timer(again) { reconnect(true) } + EM.add_timer(again) do + begin + reconnect(true) + rescue Exception => e + logger.exception("[amqp] Failed to reconnect", e, :trace) + end + end return elsif ![nil, true].include?(again) raise ::AMQP::Error, "Could not interpret :reconnect_delay => #{again.inspect}; expected nil, true, or Numeric" end end