lib/right_amqp/amqp/client.rb in right_amqp-0.3.1 vs lib/right_amqp/amqp/client.rb in right_amqp-0.3.2
- old
+ new
@@ -4,10 +4,11 @@
module AMQP
class Error < StandardError; end
module BasicClient
def process_frame frame
+ @last_data_received = Time.now
if mq = channels[frame.channel]
mq.process_frame(frame)
return
end
@@ -46,12 +47,11 @@
@on_disconnect.call if @on_disconnect
end
when Frame::Heartbeat
logger.debug("[amqp] Received heartbeat from broker #{@settings[:identity]}")
- @last_server_heartbeat = Time.now
-
+ @last_heartbeat_received = @last_data_received
end
# Make callback now that handshake with the broker has completed
# The 'connected' status callback happens before the handshake is done and if it results in
# a lot of activity it might prevent EM from being able to call the code handling the
@@ -109,25 +109,52 @@
init_heartbeat if (@settings[:heartbeat] = heartbeat.to_i) > 0
end
end
def init_heartbeat
- logger.debug("[amqp] Initializing heartbeat for broker #{@settings[:identity]} to #{@settings[:heartbeat]}")
- @last_server_heartbeat = Time.now
+ # Randomly offset start of heartbeat timer to help separate heartbeat
+ # activity when there are multiple broker connections active
+ EM.add_timer(rand(@settings[:heartbeat])) do
+ begin
+ # While connected, a heartbeat or some other data is expected to be received from
+ # the broker at least every 2 x :heartbeat seconds, otherwise the connection is
+ # assumed to be broken and therefore is closed to cause an automatic reconnect.
+ # While connected, this client will send a heartbeat every :heartbeat
+ # seconds regardless of any other send activity. The RabbitMQ broker will behave
+ # similarly and drop the connection if it does not receive a heartbeat or other
+ # data in time.
+ logger.info("[amqp] Initializing heartbeat for broker #{@settings[:identity]} to #{@settings[:heartbeat]} sec")
- @timer.cancel if @timer
- @timer = EM::PeriodicTimer.new(@settings[:heartbeat]) do
- if connected?
- if @last_server_heartbeat < (Time.now - (@settings[:heartbeat] * 2))
- log "Reconnecting due to missing server heartbeats"
- logger.info("[amqp] Reconnecting to broker #{@settings[:identity]} due to missing server heartbeats")
- reconnect(true)
- else
- logger.debug("[amqp] Sending heartbeat to broker #{@settings[:identity]}")
- @last_server_heartbeat = Time.now
- send AMQP::Frame::Heartbeat.new, :channel => 0
+ timeout_factor = 2
+ @heartbeat_timer.cancel if @heartbeat_timer
+ @heartbeat_timer = EM::PeriodicTimer.new(@settings[:heartbeat]) do
+ begin
+ if connected?
+ now = Time.now
+ if @last_data_received < (now - (@settings[:heartbeat] * timeout_factor))
+ data_received = (now - @last_data_received).to_i if @last_data_received
+ heartbeat_received = (now - @last_heartbeat_received).to_i if @last_heartbeat_received
+ heartbeat_sent = (now - @last_heartbeat_sent).to_i if @last_heartbeat_sent
+ logger.info("[amqp] Reconnecting to broker #{@settings[:identity]} due to heartbeat timeout: " +
+ "last data received #{data_received.inspect} sec ago, " +
+ "last heartbeat received #{heartbeat_received.inspect} sec ago, " +
+ "last heartbeat sent #{heartbeat_sent.inspect} sec ago")
+ close_connection # which results in an unbind and an automatic reconnect
+ else
+ logger.debug("[amqp] Sending heartbeat to broker #{@settings[:identity]}")
+ send AMQP::Frame::Heartbeat.new, :channel => 0
+ @last_heartbeat_sent = Time.now
+ end
+ else
+ logger.debug("[amqp] Skipping heartbeat check for broker #{@settings[:identity]} because disconnected")
+ end
+ rescue Exception => e
+ logger.error("[amqp] Failed heartbeat check (#{e})\n" + e.backtrace.join("\n"))
+ end
end
+ rescue Exception => e
+ logger.error("[amqp] Failed heartbeat initialization (#{e})\n" + e.backtrace.join("\n"))
end
end
end
def connected?
@@ -191,10 +218,12 @@
# super
# end
#:startdoc:
def close &on_disconnect
+ @heartbeat_timer.cancel if @heartbeat_timer
+ @heartbeat_timer = nil
if on_disconnect
@closing = true
@on_disconnect = proc{
on_disconnect.call
@closing = false
@@ -216,11 +245,17 @@
end
def reconnect force = false
if @reconnecting and not force
# Wait after first reconnect attempt and in between each subsequent attempt
- EM.add_timer(@settings[:reconnect_interval] || 5) { reconnect(true) }
+ EM.add_timer(@settings[:reconnect_interval] || 5) do
+ begin
+ reconnect(true)
+ rescue Exception => e
+ logger.exception("[amqp] Failed to reconnect", e, :trace)
+ end
+ end
return
end
unless @reconnecting
@deferred_status = nil
@@ -234,10 +269,16 @@
again = @settings[:reconnect_delay]
again = again.call if again.is_a?(Proc)
if again.is_a?(Numeric)
# Wait before making initial reconnect attempt
- EM.add_timer(again) { reconnect(true) }
+ EM.add_timer(again) do
+ begin
+ reconnect(true)
+ rescue Exception => e
+ logger.exception("[amqp] Failed to reconnect", e, :trace)
+ end
+ end
return
elsif ![nil, true].include?(again)
raise ::AMQP::Error, "Could not interpret :reconnect_delay => #{again.inspect}; expected nil, true, or Numeric"
end
end