lib/bunny/session.rb in bunny-2.7.0 vs lib/bunny/session.rb in bunny-2.7.1

- old
+ new

@@ -99,20 +99,22 @@ # @option connection_string_or_opts [Array<String>] :addresses (["127.0.0.1:5672"]) list of addresses to select hostname and port from when connecting # @option connection_string_or_opts [Integer] :port (5672) Port RabbitMQ listens on # @option connection_string_or_opts [String] :username ("guest") Username # @option connection_string_or_opts [String] :password ("guest") Password # @option connection_string_or_opts [String] :vhost ("/") Virtual host to use - # @option connection_string_or_opts [Integer] :heartbeat (600) Heartbeat interval. 0 means no heartbeat. + # @option connection_string_or_opts [Integer, Symbol] :heartbeat (:server) Heartbeat interval. :server means use the default suggested by RabbitMQ. 0 means no heartbeat (not recommended). # @option connection_string_or_opts [Integer] :network_recovery_interval (4) Recovery interval periodic network recovery will use. This includes initial pause after network failure. # @option connection_string_or_opts [Boolean] :tls (false) Should TLS/SSL be used? # @option connection_string_or_opts [String] :tls_cert (nil) Path to client TLS/SSL certificate file (.pem) # @option connection_string_or_opts [String] :tls_key (nil) Path to client TLS/SSL private key file (.pem) # @option connection_string_or_opts [Array<String>] :tls_ca_certificates Array of paths to TLS/SSL CA files (.pem), by default detected from OpenSSL configuration # @option connection_string_or_opts [String] :verify_peer (true) Whether TLS peer verification should be performed - # @option connection_string_or_opts [Keyword] :tls_version (negotiated) What TLS version should be used (:TLSv1, :TLSv1_1, or :TLSv1_2) + # @option connection_string_or_opts [Symbol] :tls_version (negotiated) What TLS version should be used (:TLSv1, :TLSv1_1, or :TLSv1_2) # @option connection_string_or_opts [Integer] :continuation_timeout (15000) Timeout for client operations that expect a response (e.g. {Bunny::Queue#get}), in milliseconds. - # @option connection_string_or_opts [Integer] :connection_timeout (5) Timeout in seconds for connecting to the server. + # @option connection_string_or_opts [Integer] :connection_timeout (30) Timeout in seconds for connecting to the server. + # @option connection_string_or_opts [Integer] :read_timeout (30) TCP socket read timeout in seconds. + # @option connection_string_or_opts [Integer] :write_timeout (30) TCP socket write timeout in seconds. # @option connection_string_or_opts [Proc] :hosts_shuffle_strategy A Proc that reorders a list of host strings, defaults to Array#shuffle # @option connection_string_or_opts [Logger] :logger The logger. If missing, one is created using :log_file and :log_level. # @option connection_string_or_opts [IO, String] :log_file The file or path to use when creating a logger. Defaults to STDOUT. # @option connection_string_or_opts [IO, String] :logfile DEPRECATED: use :log_file instead. The file or path to use when creating a logger. Defaults to STDOUT. # @option connection_string_or_opts [Integer] :log_level The log level to use when creating a logger. Defaults to LOGGER::WARN @@ -344,10 +346,11 @@ # # @return [Bunny::Channel] Newly opened channel def create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60) raise ArgumentError, "channel number 0 is reserved in the protocol and cannot be used" if 0 == n raise ConnectionAlreadyClosed if manually_closed? + raise RuntimeError, "this connection is not open. Was Bunny::Session#start invoked? Is automatic recovery enabled?" if !connected? @channel_mutex.synchronize do if n && (ch = @channels[n]) ch else @@ -539,10 +542,20 @@ @last_channel_close_ok end end # @private + def find_channel(number) + @channels[number] + end + + # @private + def synchronised_find_channel(number) + @channel_mutex.synchronize { @channels[number] } + end + + # @private def close_all_channels @channel_mutex.synchronize do @channels.reject {|n, ch| n == 0 || !ch.open? }.each do |_, ch| Bunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close } end @@ -604,19 +617,24 @@ when AMQ::Protocol::Connection::Unblocked then @blocked = false @unblock_callback.call(method) if @unblock_callback when AMQ::Protocol::Channel::Close then begin - ch = @channels[ch_number] + ch = synchronised_find_channel(ch_number) + # this includes sending a channel.close-ok and + # potentially invoking a user-provided callback, + # avoid doing that while holding a mutex lock. MK. ch.handle_method(method) ensure + # synchronises on @channel_mutex under the hood self.unregister_channel(ch) end when AMQ::Protocol::Basic::GetEmpty then - @channels[ch_number].handle_basic_get_empty(method) + ch = find_channel(ch_number) + ch.handle_basic_get_empty(method) else - if ch = @channels[ch_number] + if ch = find_channel(ch_number) ch.handle_method(method) else @logger.warn "Channel #{ch_number} is not open on this connection!" end end @@ -1159,14 +1177,21 @@ @logger.debug { "Heartbeat interval negotiation: client = #{@client_heartbeat}, server = #{connection_tune.heartbeat}, result = #{@heartbeat}" } @logger.info "Heartbeat interval used (in seconds): #{@heartbeat}" # We set the read_write_timeout to twice the heartbeat value # This allows us to miss a single heartbeat before we time out the socket. - @transport.read_timeout = if heartbeat_disabled?(@client_heartbeat) + # + # Since RabbitMQ can be configured to disable heartbeats (bad idea but technically + # possible nonetheless), we need to take both client and server values into + # consideration when deciding about using the heartbeat value for read timeouts. + @transport.read_timeout = if heartbeat_disabled?(@client_heartbeat) || heartbeat_disabled?(@heartbeat) + @logger.debug { "Will use default socket read timeout of #{Transport::DEFAULT_READ_TIMEOUT}" } Transport::DEFAULT_READ_TIMEOUT else # pad to account for edge cases. MK. - @heartbeat * 2.2 + n = @heartbeat * 2.2 + @logger.debug { "Will use socket read timeout of #{n}" } + n end # if there are existing channels we've just recovered from # a network failure and need to fix the allocated set. See issue 205. MK.