lib/bunny/session.rb in bunny-2.7.0 vs lib/bunny/session.rb in bunny-2.7.1
- old
+ new
@@ -99,20 +99,22 @@
# @option connection_string_or_opts [Array<String>] :addresses (["127.0.0.1:5672"]) list of addresses to select hostname and port from when connecting
# @option connection_string_or_opts [Integer] :port (5672) Port RabbitMQ listens on
# @option connection_string_or_opts [String] :username ("guest") Username
# @option connection_string_or_opts [String] :password ("guest") Password
# @option connection_string_or_opts [String] :vhost ("/") Virtual host to use
- # @option connection_string_or_opts [Integer] :heartbeat (600) Heartbeat interval. 0 means no heartbeat.
+ # @option connection_string_or_opts [Integer, Symbol] :heartbeat (:server) Heartbeat interval. :server means use the default suggested by RabbitMQ. 0 means no heartbeat (not recommended).
# @option connection_string_or_opts [Integer] :network_recovery_interval (4) Recovery interval periodic network recovery will use. This includes initial pause after network failure.
# @option connection_string_or_opts [Boolean] :tls (false) Should TLS/SSL be used?
# @option connection_string_or_opts [String] :tls_cert (nil) Path to client TLS/SSL certificate file (.pem)
# @option connection_string_or_opts [String] :tls_key (nil) Path to client TLS/SSL private key file (.pem)
# @option connection_string_or_opts [Array<String>] :tls_ca_certificates Array of paths to TLS/SSL CA files (.pem), by default detected from OpenSSL configuration
# @option connection_string_or_opts [String] :verify_peer (true) Whether TLS peer verification should be performed
- # @option connection_string_or_opts [Keyword] :tls_version (negotiated) What TLS version should be used (:TLSv1, :TLSv1_1, or :TLSv1_2)
+ # @option connection_string_or_opts [Symbol] :tls_version (negotiated) What TLS version should be used (:TLSv1, :TLSv1_1, or :TLSv1_2)
# @option connection_string_or_opts [Integer] :continuation_timeout (15000) Timeout for client operations that expect a response (e.g. {Bunny::Queue#get}), in milliseconds.
- # @option connection_string_or_opts [Integer] :connection_timeout (5) Timeout in seconds for connecting to the server.
+ # @option connection_string_or_opts [Integer] :connection_timeout (30) Timeout in seconds for connecting to the server.
+ # @option connection_string_or_opts [Integer] :read_timeout (30) TCP socket read timeout in seconds.
+ # @option connection_string_or_opts [Integer] :write_timeout (30) TCP socket write timeout in seconds.
# @option connection_string_or_opts [Proc] :hosts_shuffle_strategy A Proc that reorders a list of host strings, defaults to Array#shuffle
# @option connection_string_or_opts [Logger] :logger The logger. If missing, one is created using :log_file and :log_level.
# @option connection_string_or_opts [IO, String] :log_file The file or path to use when creating a logger. Defaults to STDOUT.
# @option connection_string_or_opts [IO, String] :logfile DEPRECATED: use :log_file instead. The file or path to use when creating a logger. Defaults to STDOUT.
# @option connection_string_or_opts [Integer] :log_level The log level to use when creating a logger. Defaults to LOGGER::WARN
@@ -344,10 +346,11 @@
#
# @return [Bunny::Channel] Newly opened channel
def create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false, consumer_pool_shutdown_timeout = 60)
raise ArgumentError, "channel number 0 is reserved in the protocol and cannot be used" if 0 == n
raise ConnectionAlreadyClosed if manually_closed?
+ raise RuntimeError, "this connection is not open. Was Bunny::Session#start invoked? Is automatic recovery enabled?" if !connected?
@channel_mutex.synchronize do
if n && (ch = @channels[n])
ch
else
@@ -539,10 +542,20 @@
@last_channel_close_ok
end
end
# @private
+ def find_channel(number)
+ @channels[number]
+ end
+
+ # @private
+ def synchronised_find_channel(number)
+ @channel_mutex.synchronize { @channels[number] }
+ end
+
+ # @private
def close_all_channels
@channel_mutex.synchronize do
@channels.reject {|n, ch| n == 0 || !ch.open? }.each do |_, ch|
Bunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close }
end
@@ -604,19 +617,24 @@
when AMQ::Protocol::Connection::Unblocked then
@blocked = false
@unblock_callback.call(method) if @unblock_callback
when AMQ::Protocol::Channel::Close then
begin
- ch = @channels[ch_number]
+ ch = synchronised_find_channel(ch_number)
+ # this includes sending a channel.close-ok and
+ # potentially invoking a user-provided callback,
+ # avoid doing that while holding a mutex lock. MK.
ch.handle_method(method)
ensure
+ # synchronises on @channel_mutex under the hood
self.unregister_channel(ch)
end
when AMQ::Protocol::Basic::GetEmpty then
- @channels[ch_number].handle_basic_get_empty(method)
+ ch = find_channel(ch_number)
+ ch.handle_basic_get_empty(method)
else
- if ch = @channels[ch_number]
+ if ch = find_channel(ch_number)
ch.handle_method(method)
else
@logger.warn "Channel #{ch_number} is not open on this connection!"
end
end
@@ -1159,14 +1177,21 @@
@logger.debug { "Heartbeat interval negotiation: client = #{@client_heartbeat}, server = #{connection_tune.heartbeat}, result = #{@heartbeat}" }
@logger.info "Heartbeat interval used (in seconds): #{@heartbeat}"
# We set the read_write_timeout to twice the heartbeat value
# This allows us to miss a single heartbeat before we time out the socket.
- @transport.read_timeout = if heartbeat_disabled?(@client_heartbeat)
+ #
+ # Since RabbitMQ can be configured to disable heartbeats (bad idea but technically
+ # possible nonetheless), we need to take both client and server values into
+ # consideration when deciding about using the heartbeat value for read timeouts.
+ @transport.read_timeout = if heartbeat_disabled?(@client_heartbeat) || heartbeat_disabled?(@heartbeat)
+ @logger.debug { "Will use default socket read timeout of #{Transport::DEFAULT_READ_TIMEOUT}" }
Transport::DEFAULT_READ_TIMEOUT
else
# pad to account for edge cases. MK.
- @heartbeat * 2.2
+ n = @heartbeat * 2.2
+ @logger.debug { "Will use socket read timeout of #{n}" }
+ n
end
# if there are existing channels we've just recovered from
# a network failure and need to fix the allocated set. See issue 205. MK.