lib/bunny/session.rb in bunny-1.5.1 vs lib/bunny/session.rb in bunny-1.6.0.pre1

- old
+ new

@@ -152,11 +152,11 @@ opts[:automatically_recover] || opts[:automatic_recovery] end @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL) @recover_from_connection_close = opts.fetch(:recover_from_connection_close, false) # in ms - @continuation_timeout = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT) + @continuation_timeout = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT) @status = :not_connected @blocked = false # these are negotiated with the broker during the connection tuning phase @@ -277,13 +277,15 @@ @reader_loop = nil self.start_reader_loop if threaded? rescue TCPConnectionFailed => e - self.initialize_transport @logger.warn e.message + + self.initialize_transport + @logger.warn "Retrying connection on next host in line: #{@transport.host}:#{@transport.port}" return self.start rescue @status_mutex.synchronize { @status = :not_connected } @@ -297,15 +299,15 @@ end self end - # Socket operation timeout used by this connection + # Socket operation write timeout used by this connection # @return [Integer] # @private - def read_write_timeout - @transport.read_write_timeout + def transport_write_timeout + @transport.write_timeout end # Opens a new channel and returns it. This method will block the calling # thread until the response is received and the channel is guaranteed to be # opened (this operation is very fast and inexpensive). @@ -643,19 +645,23 @@ def recover_from_network_failure begin sleep @network_recovery_interval @logger.debug "About to start connection recovery..." - self.reset_host_index # since we are starting a fresh try. self.initialize_transport + + @logger.warn "Retrying connection on next host in line: #{@transport.host}:#{@transport.port}" self.start if open? @recovering_from_network_failure = false recover_channels end + rescue HostListDepleted + reset_host_index + retry rescue TCPConnectionFailedForAllHosts, TCPConnectionFailed, AMQ::Protocol::EmptyResponseError => e @logger.warn "TCP connection failed, reconnecting in #{@network_recovery_interval} seconds" sleep @network_recovery_interval retry if recoverable_network_failure?(e) end @@ -988,10 +994,14 @@ negotiate_value(@client_heartbeat, connection_tune.heartbeat) end @logger.debug "Heartbeat interval negotiation: client = #{@client_heartbeat}, server = #{connection_tune.heartbeat}, result = #{@heartbeat}" @logger.info "Heartbeat interval used (in seconds): #{@heartbeat}" + # We set the read_write_timeout to twice the heartbeat value + # This allows us to miss a single heartbeat before we time out the socket. + @transport.read_timeout = @heartbeat * 2 + # if there are existing channels we've just recovered from # a network failure and need to fix the allocated set. See issue 205. MK. if @channels.empty? @channel_id_allocator = ChannelIdAllocator.new(@channel_max) end @@ -1077,10 +1087,16 @@ # @private def initialize_transport if host = @hosts[ @host_index ] @host_index_mutex.synchronize { @host_index += 1 } + @transport.close rescue nil # Let's make sure the previous transport socket is closed @transport = Transport.new(self, host, @port, @opts.merge(:session_thread => @origin_thread)) + + # Reset the cached progname for the logger + @logger.progname = to_s if @logger.respond_to?(:progname) + + @transport else raise HostListDepleted end end