lib/right_amqp/amqp/client.rb in right_amqp-0.7.0 vs lib/right_amqp/amqp/client.rb in right_amqp-0.8.3
- old
+ new
@@ -54,11 +54,11 @@
# Make callback now that handshake with the broker has completed
# The 'connected' status callback happens before the handshake is done and if it results in
# a lot of activity it might prevent EM from being able to call the code handling the
# incoming handshake packet in a timely fashion causing the broker to close the connection
- @connection_status.call(:ready) if @connection_status && frame.payload.is_a?(AMQP::Protocol::Connection::Start)
+ @connection_status.call(:ready) if @connection_status && frame.payload.is_a?(AMQP::Protocol::Connection::OpenOk)
end
end
def self.client
@client ||= BasicClient
@@ -97,10 +97,11 @@
@on_disconnect = method(:disconnected)
@reconnecting = false
end
@connected = true
+ logger.debug("[amqp] Connected to broker #{@settings[:identity]}")
@connection_status.call(:connected) if @connection_status
@buf = Buffer.new
send_data HEADER
send_data [1, 1, VERSION_MAJOR, VERSION_MINOR].pack('C4')
@@ -128,12 +129,17 @@
@heartbeat_timer.cancel if @heartbeat_timer
@heartbeat_timer = EM::PeriodicTimer.new(@settings[:heartbeat]) do
begin
if connected?
now = Time.now
- if @last_data_received < (now - (@settings[:heartbeat] * timeout_factor))
- data_received = (now - @last_data_received).to_i if @last_data_received
+ if @last_data_received.nil?
+ # This is something of an anomaly; not clear how this condition is reached
+ logger.info("[amqp] Reconnecting to broker #{@settings[:identity]} due to heartbeat timeout " +
+ "with no data having been received")
+ close_connection # which results in an unbind and an automatic reconnect
+ elsif @last_data_received < (now - (@settings[:heartbeat] * timeout_factor))
+ data_received = (now - @last_data_received).to_i
heartbeat_received = (now - @last_heartbeat_received).to_i if @last_heartbeat_received
heartbeat_sent = (now - @last_heartbeat_sent).to_i if @last_heartbeat_sent
logger.info("[amqp] Reconnecting to broker #{@settings[:identity]} due to heartbeat timeout: " +
"last data received #{data_received.inspect} sec ago, " +
"last heartbeat received #{heartbeat_received.inspect} sec ago, " +
@@ -162,10 +168,11 @@
end
def unbind
log 'disconnected'
@connected = false
+ logger.debug("[amqp] Disconnected from broker #{@settings[:identity]}")
EM.next_tick{ @on_disconnect.call }
end
def add_channel mq
(@_channel_mutex ||= Mutex.new).synchronize do
@@ -271,10 +278,10 @@
raise ::AMQP::Error, "Could not interpret :reconnect_delay => #{again.inspect}; expected nil, true, or Numeric"
end
end
log 'reconnecting'
- logger.info("[amqp] Attempting to reconnect to #{@settings[:identity]}")
+ logger.info("[amqp] Attempting to reconnect to broker #{@settings[:identity]}")
EM.reconnect(@settings[:host], @settings[:port], self)
rescue Exception => e
logger.exception("[amqp] Failed to reconnect", e, :trace)
failed
end