lib/bunny/session.rb in bunny-1.5.1 vs lib/bunny/session.rb in bunny-1.6.0.pre1
- old
+ new
@@ -152,11 +152,11 @@
opts[:automatically_recover] || opts[:automatic_recovery]
end
@network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
@recover_from_connection_close = opts.fetch(:recover_from_connection_close, false)
# in ms
- @continuation_timeout = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT)
+ @continuation_timeout = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT)
@status = :not_connected
@blocked = false
# these are negotiated with the broker during the connection tuning phase
@@ -277,13 +277,15 @@
@reader_loop = nil
self.start_reader_loop if threaded?
rescue TCPConnectionFailed => e
- self.initialize_transport
@logger.warn e.message
+
+ self.initialize_transport
+
@logger.warn "Retrying connection on next host in line: #{@transport.host}:#{@transport.port}"
return self.start
rescue
@status_mutex.synchronize { @status = :not_connected }
@@ -297,15 +299,15 @@
end
self
end
- # Socket operation timeout used by this connection
+ # Socket operation write timeout used by this connection
# @return [Integer]
# @private
- def read_write_timeout
- @transport.read_write_timeout
+ def transport_write_timeout
+ @transport.write_timeout
end
# Opens a new channel and returns it. This method will block the calling
# thread until the response is received and the channel is guaranteed to be
# opened (this operation is very fast and inexpensive).
@@ -643,19 +645,23 @@
def recover_from_network_failure
begin
sleep @network_recovery_interval
@logger.debug "About to start connection recovery..."
- self.reset_host_index # since we are starting a fresh try.
self.initialize_transport
+
+ @logger.warn "Retrying connection on next host in line: #{@transport.host}:#{@transport.port}"
self.start
if open?
@recovering_from_network_failure = false
recover_channels
end
+ rescue HostListDepleted
+ reset_host_index
+ retry
rescue TCPConnectionFailedForAllHosts, TCPConnectionFailed, AMQ::Protocol::EmptyResponseError => e
@logger.warn "TCP connection failed, reconnecting in #{@network_recovery_interval} seconds"
sleep @network_recovery_interval
retry if recoverable_network_failure?(e)
end
@@ -988,10 +994,14 @@
negotiate_value(@client_heartbeat, connection_tune.heartbeat)
end
@logger.debug "Heartbeat interval negotiation: client = #{@client_heartbeat}, server = #{connection_tune.heartbeat}, result = #{@heartbeat}"
@logger.info "Heartbeat interval used (in seconds): #{@heartbeat}"
+ # We set the read_write_timeout to twice the heartbeat value
+ # This allows us to miss a single heartbeat before we time out the socket.
+ @transport.read_timeout = @heartbeat * 2
+
# if there are existing channels we've just recovered from
# a network failure and need to fix the allocated set. See issue 205. MK.
if @channels.empty?
@channel_id_allocator = ChannelIdAllocator.new(@channel_max)
end
@@ -1077,10 +1087,16 @@
# @private
def initialize_transport
if host = @hosts[ @host_index ]
@host_index_mutex.synchronize { @host_index += 1 }
+ @transport.close rescue nil # Let's make sure the previous transport socket is closed
@transport = Transport.new(self, host, @port, @opts.merge(:session_thread => @origin_thread))
+
+ # Reset the cached progname for the logger
+ @logger.progname = to_s if @logger.respond_to?(:progname)
+
+ @transport
else
raise HostListDepleted
end
end