lib/bunny/session.rb in bunny-2.6.3 vs lib/bunny/session.rb in bunny-2.6.4
- old
+ new
@@ -115,12 +115,13 @@
# @option connection_string_or_opts [Logger] :logger The logger. If missing, one is created using :log_file and :log_level.
# @option connection_string_or_opts [IO, String] :log_file The file or path to use when creating a logger. Defaults to STDOUT.
# @option connection_string_or_opts [IO, String] :logfile DEPRECATED: use :log_file instead. The file or path to use when creating a logger. Defaults to STDOUT.
# @option connection_string_or_opts [Integer] :log_level The log level to use when creating a logger. Defaults to LOGGER::WARN
# @option connection_string_or_opts [Boolean] :automatically_recover (true) Should automatically recover from network failures?
- # @option connection_string_or_opts [Integer] :recovery_attempts (nil) Max number of recovery attempts, nil means forever, 0 means never
- # @option connection_string_or_opts [Boolean] :recover_from_connection_close (true) Recover from server-sent connection.close
+ # @option connection_string_or_opts [Integer] :recovery_attempts (nil) Max number of recovery attempts, nil means forever
+ # @option connection_string_or_opts [Integer] :reset_recovery_attempts_after_reconnection (true) Should recovery attempt counter be reset after successful reconnection? When set to false, the attempt counter will last through the entire lifetime of the connection object.
+ # @option connection_string_or_opts [Boolean] :recover_from_connection_close (true) Should this connection recover after receiving a server-sent connection.close (e.g. connection was force closed)?
#
# @option optz [String] :auth_mechanism ("PLAIN") Authentication mechanism, PLAIN or EXTERNAL
# @option optz [String] :locale ("PLAIN") Locale RabbitMQ should use
#
# @see http://rubybunny.info/articles/connecting.html Connecting to RabbitMQ guide
@@ -162,11 +163,17 @@
@automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
true
else
opts[:automatically_recover] || opts[:automatic_recovery]
end
- @recovery_attempts = opts[:recovery_attempts]
+ @max_recovery_attempts = opts[:recovery_attempts]
+ @recovery_attempts = @max_recovery_attempts
+ # When this is set, connection attempts won't be reset after
+ # successful reconnection. Some find this behavior more sensible
+ # than the per-failure attempt counter. MK.
+ @reset_recovery_attempt_counter_after_reconnection = opts.fetch(:reset_recovery_attempts_after_reconnection, true)
+
@network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
@recover_from_connection_close = opts.fetch(:recover_from_connection_close, true)
# in ms
@continuation_timeout = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT)
@@ -316,10 +323,11 @@
rescue HostListDepleted
self.reset_address_index
@status_mutex.synchronize { @status = :not_connected }
raise TCPConnectionFailedForAllHosts
end
+ @status_mutex.synchronize { @manually_closed = false }
self
end
# Socket operation write timeout used by this connection
@@ -334,10 +342,11 @@
# opened (this operation is very fast and inexpensive).
#
# @return [Bunny::Channel] Newly opened channel
def create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60)
raise ArgumentError, "channel number 0 is reserved in the protocol and cannot be used" if 0 == n
+ raise ConnectionAlreadyClosed if manually_closed?
@channel_mutex.synchronize do
if n && (ch = @channels[n])
ch
else
@@ -360,11 +369,14 @@
self.close_connection(true)
end
clean_up_on_shutdown
end
- @status_mutex.synchronize { @status = :closed }
+ @status_mutex.synchronize do
+ @status = :closed
+ @manually_closed = true
+ end
end
alias stop close
# Creates a temporary channel, yields it to the block given to this
# method and closes it.
@@ -395,10 +407,15 @@
# @return [Boolean] true if this AMQP 0.9.1 connection is closed
def closed?
@status_mutex.synchronize { @status == :closed }
end
+ # @return [Boolean] true if this AMQP 0.9.1 connection has been programmatically closed
+ def manually_closed?
+ @status_mutex.synchronize { @manually_closed == true }
+ end
+
# @return [Boolean] true if this AMQP 0.9.1 connection is open
def open?
@status_mutex.synchronize do
(status == :open || status == :connected || status == :connecting) && @transport.open?
end
@@ -638,69 +655,110 @@
if !recovering_from_network_failure?
begin
@recovering_from_network_failure = true
if recoverable_network_failure?(exception)
- @logger.warn "Recovering from a network failure..."
+ announce_network_failure_recovery
@channel_mutex.synchronize do
@channels.each do |n, ch|
ch.maybe_kill_consumer_work_pool!
end
end
@reader_loop.stop if @reader_loop
maybe_shutdown_heartbeat_sender
recover_from_network_failure
else
- # TODO: investigate if we can be a bit smarter here. MK.
+ @logger.error "Exception #{exception.message} is considered unrecoverable..."
end
ensure
@recovering_from_network_failure = false
end
end
end
# @private
def recoverable_network_failure?(exception)
- # TODO: investigate if we can be a bit smarter here. MK.
+ # No reasonably smart strategy was suggested in a few years.
+ # So just recover unconditionally. MK.
true
end
# @private
def recovering_from_network_failure?
@recovering_from_network_failure
end
# @private
+ def announce_network_failure_recovery
+ if recovery_attempts_limited?
+ @logger.warn "Will recover from a network failure (#{@recovery_attempts} out of #{@max_recovery_attempts} left)..."
+ else
+ @logger.warn "Will recover from a network failure (no retry limit)..."
+ end
+ end
+
+ # @private
def recover_from_network_failure
sleep @network_recovery_interval
- @logger.debug "About to start connection recovery..."
+ @logger.debug "Will attempt connection recovery..."
self.initialize_transport
@logger.warn "Retrying connection on next host in line: #{@transport.host}:#{@transport.port}"
self.start
if open?
+
@recovering_from_network_failure = false
+ @logger.debug "Connection is now open"
+ if @reset_recovery_attempt_counter_after_reconnection
+ @logger.debug "Resetting recovery attempt counter after successful reconnection"
+ reset_recovery_attempt_counter!
+ else
+ @logger.debug "Not resetting recovery attempt counter after successful reconnection, as configured"
+ end
+ reset_recovery_attempt_counter!
recover_channels
end
rescue HostListDepleted
reset_address_index
retry
rescue TCPConnectionFailedForAllHosts, TCPConnectionFailed, AMQ::Protocol::EmptyResponseError => e
@logger.warn "TCP connection failed, reconnecting in #{@network_recovery_interval} seconds"
sleep @network_recovery_interval
if should_retry_recovery?
- @recovery_attempts -= 1 if @recovery_attempts
- retry if recoverable_network_failure?(e)
+ decrement_recovery_attemp_counter!
+ if recoverable_network_failure?(e)
+ announce_network_failure_recovery
+ retry
+ end
+ else
+ @logger.error "Ran out of recovery attempts (limit set to #{@max_recovery_attempts})"
end
end
# @private
+ def recovery_attempts_limited?
+ !!@max_recovery_attempts
+ end
+
+ # @private
def should_retry_recovery?
- @recovery_attempts.nil? || @recovery_attempts > 1
+ !recovery_attempts_limited? || @recovery_attempts > 1
+ end
+
+ # @private
+ def decrement_recovery_attemp_counter!
+ @recovery_attempts -= 1 if @recovery_attempts
+ @logger.debug "#{@recovery_attempts} recovery attempts left"
+ @recovery_attempts
+ end
+
+ # @private
+ def reset_recovery_attempt_counter!
+ @recovery_attempts = @max_recovery_attempts
end
# @private
def recover_channels
@channel_mutex.synchronize do