lib/bunny/session.rb in bunny-2.6.3 vs lib/bunny/session.rb in bunny-2.6.4

- old
+ new

@@ -115,12 +115,13 @@ # @option connection_string_or_opts [Logger] :logger The logger. If missing, one is created using :log_file and :log_level. # @option connection_string_or_opts [IO, String] :log_file The file or path to use when creating a logger. Defaults to STDOUT. # @option connection_string_or_opts [IO, String] :logfile DEPRECATED: use :log_file instead. The file or path to use when creating a logger. Defaults to STDOUT. # @option connection_string_or_opts [Integer] :log_level The log level to use when creating a logger. Defaults to LOGGER::WARN # @option connection_string_or_opts [Boolean] :automatically_recover (true) Should automatically recover from network failures? - # @option connection_string_or_opts [Integer] :recovery_attempts (nil) Max number of recovery attempts, nil means forever, 0 means never - # @option connection_string_or_opts [Boolean] :recover_from_connection_close (true) Recover from server-sent connection.close + # @option connection_string_or_opts [Integer] :recovery_attempts (nil) Max number of recovery attempts, nil means forever + # @option connection_string_or_opts [Integer] :reset_recovery_attempts_after_reconnection (true) Should recovery attempt counter be reset after successful reconnection? When set to false, the attempt counter will last through the entire lifetime of the connection object. + # @option connection_string_or_opts [Boolean] :recover_from_connection_close (true) Should this connection recover after receiving a server-sent connection.close (e.g. connection was force closed)? # # @option optz [String] :auth_mechanism ("PLAIN") Authentication mechanism, PLAIN or EXTERNAL # @option optz [String] :locale ("PLAIN") Locale RabbitMQ should use # # @see http://rubybunny.info/articles/connecting.html Connecting to RabbitMQ guide @@ -162,11 +163,17 @@ @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil? true else opts[:automatically_recover] || opts[:automatic_recovery] end - @recovery_attempts = opts[:recovery_attempts] + @max_recovery_attempts = opts[:recovery_attempts] + @recovery_attempts = @max_recovery_attempts + # When this is set, connection attempts won't be reset after + # successful reconnection. Some find this behavior more sensible + # than the per-failure attempt counter. MK. + @reset_recovery_attempt_counter_after_reconnection = opts.fetch(:reset_recovery_attempts_after_reconnection, true) + @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL) @recover_from_connection_close = opts.fetch(:recover_from_connection_close, true) # in ms @continuation_timeout = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT) @@ -316,10 +323,11 @@ rescue HostListDepleted self.reset_address_index @status_mutex.synchronize { @status = :not_connected } raise TCPConnectionFailedForAllHosts end + @status_mutex.synchronize { @manually_closed = false } self end # Socket operation write timeout used by this connection @@ -334,10 +342,11 @@ # opened (this operation is very fast and inexpensive). # # @return [Bunny::Channel] Newly opened channel def create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60) raise ArgumentError, "channel number 0 is reserved in the protocol and cannot be used" if 0 == n + raise ConnectionAlreadyClosed if manually_closed? @channel_mutex.synchronize do if n && (ch = @channels[n]) ch else @@ -360,11 +369,14 @@ self.close_connection(true) end clean_up_on_shutdown end - @status_mutex.synchronize { @status = :closed } + @status_mutex.synchronize do + @status = :closed + @manually_closed = true + end end alias stop close # Creates a temporary channel, yields it to the block given to this # method and closes it. @@ -395,10 +407,15 @@ # @return [Boolean] true if this AMQP 0.9.1 connection is closed def closed? @status_mutex.synchronize { @status == :closed } end + # @return [Boolean] true if this AMQP 0.9.1 connection has been programmatically closed + def manually_closed? + @status_mutex.synchronize { @manually_closed == true } + end + # @return [Boolean] true if this AMQP 0.9.1 connection is open def open? @status_mutex.synchronize do (status == :open || status == :connected || status == :connecting) && @transport.open? end @@ -638,69 +655,110 @@ if !recovering_from_network_failure? begin @recovering_from_network_failure = true if recoverable_network_failure?(exception) - @logger.warn "Recovering from a network failure..." + announce_network_failure_recovery @channel_mutex.synchronize do @channels.each do |n, ch| ch.maybe_kill_consumer_work_pool! end end @reader_loop.stop if @reader_loop maybe_shutdown_heartbeat_sender recover_from_network_failure else - # TODO: investigate if we can be a bit smarter here. MK. + @logger.error "Exception #{exception.message} is considered unrecoverable..." end ensure @recovering_from_network_failure = false end end end # @private def recoverable_network_failure?(exception) - # TODO: investigate if we can be a bit smarter here. MK. + # No reasonably smart strategy was suggested in a few years. + # So just recover unconditionally. MK. true end # @private def recovering_from_network_failure? @recovering_from_network_failure end # @private + def announce_network_failure_recovery + if recovery_attempts_limited? + @logger.warn "Will recover from a network failure (#{@recovery_attempts} out of #{@max_recovery_attempts} left)..." + else + @logger.warn "Will recover from a network failure (no retry limit)..." + end + end + + # @private def recover_from_network_failure sleep @network_recovery_interval - @logger.debug "About to start connection recovery..." + @logger.debug "Will attempt connection recovery..." self.initialize_transport @logger.warn "Retrying connection on next host in line: #{@transport.host}:#{@transport.port}" self.start if open? + @recovering_from_network_failure = false + @logger.debug "Connection is now open" + if @reset_recovery_attempt_counter_after_reconnection + @logger.debug "Resetting recovery attempt counter after successful reconnection" + reset_recovery_attempt_counter! + else + @logger.debug "Not resetting recovery attempt counter after successful reconnection, as configured" + end + reset_recovery_attempt_counter! recover_channels end rescue HostListDepleted reset_address_index retry rescue TCPConnectionFailedForAllHosts, TCPConnectionFailed, AMQ::Protocol::EmptyResponseError => e @logger.warn "TCP connection failed, reconnecting in #{@network_recovery_interval} seconds" sleep @network_recovery_interval if should_retry_recovery? - @recovery_attempts -= 1 if @recovery_attempts - retry if recoverable_network_failure?(e) + decrement_recovery_attemp_counter! + if recoverable_network_failure?(e) + announce_network_failure_recovery + retry + end + else + @logger.error "Ran out of recovery attempts (limit set to #{@max_recovery_attempts})" end end # @private + def recovery_attempts_limited? + !!@max_recovery_attempts + end + + # @private def should_retry_recovery? - @recovery_attempts.nil? || @recovery_attempts > 1 + !recovery_attempts_limited? || @recovery_attempts > 1 + end + + # @private + def decrement_recovery_attemp_counter! + @recovery_attempts -= 1 if @recovery_attempts + @logger.debug "#{@recovery_attempts} recovery attempts left" + @recovery_attempts + end + + # @private + def reset_recovery_attempt_counter! + @recovery_attempts = @max_recovery_attempts end # @private def recover_channels @channel_mutex.synchronize do