lib/bunny/session.rb in bunny-2.9.2 vs lib/bunny/session.rb in bunny-2.10.0
- old
+ new
@@ -34,14 +34,14 @@
DEFAULT_PASSWORD = "guest"
# Default heartbeat interval, the same value as RabbitMQ 3.0 uses.
DEFAULT_HEARTBEAT = :server
# @private
DEFAULT_FRAME_MAX = 131072
- # 2^16 - 1, maximum representable signed 16 bit integer.
+ # Hard limit the user cannot go over regardless of server configuration.
# @private
CHANNEL_MAX_LIMIT = 65535
- DEFAULT_CHANNEL_MAX = CHANNEL_MAX_LIMIT
+ DEFAULT_CHANNEL_MAX = 2047
# backwards compatibility
# @private
CONNECT_TIMEOUT = Transport::DEFAULT_CONNECTION_TIMEOUT
@@ -99,21 +99,22 @@
# @option connection_string_or_opts [Array<String>] :addresses (["127.0.0.1:5672"]) list of addresses to select hostname and port from when connecting
# @option connection_string_or_opts [Integer] :port (5672) Port RabbitMQ listens on
# @option connection_string_or_opts [String] :username ("guest") Username
# @option connection_string_or_opts [String] :password ("guest") Password
# @option connection_string_or_opts [String] :vhost ("/") Virtual host to use
- # @option connection_string_or_opts [Integer, Symbol] :heartbeat (:server) Heartbeat interval. :server means use the default suggested by RabbitMQ. 0 means no heartbeat (not recommended).
+ # @option connection_string_or_opts [Integer, Symbol] :heartbeat (:server) Heartbeat interval. :server means use the default suggested by RabbitMQ. 0 means heartbeats and socket read timeouts will be disabled (not recommended).
# @option connection_string_or_opts [Integer] :network_recovery_interval (4) Recovery interval periodic network recovery will use. This includes initial pause after network failure.
# @option connection_string_or_opts [Boolean] :tls (false) Should TLS/SSL be used?
# @option connection_string_or_opts [String] :tls_cert (nil) Path to client TLS/SSL certificate file (.pem)
# @option connection_string_or_opts [String] :tls_key (nil) Path to client TLS/SSL private key file (.pem)
# @option connection_string_or_opts [Array<String>] :tls_ca_certificates Array of paths to TLS/SSL CA files (.pem), by default detected from OpenSSL configuration
# @option connection_string_or_opts [String] :verify_peer (true) Whether TLS peer verification should be performed
# @option connection_string_or_opts [Symbol] :tls_version (negotiated) What TLS version should be used (:TLSv1, :TLSv1_1, or :TLSv1_2)
+ # @option connection_string_or_opts [Integer] :channel_max (2047) Maximum number of channels allowed on this connection, minus 1 to account for the special channel 0.
# @option connection_string_or_opts [Integer] :continuation_timeout (15000) Timeout for client operations that expect a response (e.g. {Bunny::Queue#get}), in milliseconds.
# @option connection_string_or_opts [Integer] :connection_timeout (30) Timeout in seconds for connecting to the server.
- # @option connection_string_or_opts [Integer] :read_timeout (30) TCP socket read timeout in seconds.
+ # @option connection_string_or_opts [Integer] :read_timeout (30) TCP socket read timeout in seconds. If heartbeats are disabled this will be ignored.
# @option connection_string_or_opts [Integer] :write_timeout (30) TCP socket write timeout in seconds.
# @option connection_string_or_opts [Proc] :hosts_shuffle_strategy A Proc that reorders a list of host strings, defaults to Array#shuffle
# @option connection_string_or_opts [Logger] :logger The logger. If missing, one is created using :log_file and :log_level.
# @option connection_string_or_opts [IO, String] :log_file The file or path to use when creating a logger. Defaults to STDOUT.
# @option connection_string_or_opts [IO, String] :logfile DEPRECATED: use :log_file instead. The file or path to use when creating a logger. Defaults to STDOUT.
@@ -1066,11 +1067,13 @@
def send_frameset(frames, channel)
# some developers end up sharing channels between threads and when multiple
# threads publish on the same channel aggressively, at some point frames will be
# delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
# If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
- # locking. Note that "single frame" methods do not need this kind of synchronization. MK.
+ # locking. Note that "single frame" methods technically do not need this kind of synchronization
+ # (no incorrect frame interleaving of the same kind as with basic.publish isn't possible) but we
+ # still recommend not sharing channels between threads except for consumer-only cases in the docs. MK.
channel.synchronize do
# see rabbitmq/rabbitmq-server#156
data = frames.reduce("") { |acc, frame| acc << frame.encode }
@transport.write(data)
signal_activity!
@@ -1085,11 +1088,11 @@
def send_frameset_without_timeout(frames, channel)
# some developers end up sharing channels between threads and when multiple
# threads publish on the same channel aggressively, at some point frames will be
# delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception.
# If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
- # locking. Note that "single frame" methods do not need this kind of synchronization. MK.
+ # locking. See a note about "single frame" methods in a comment in `send_frameset`. MK.
channel.synchronize do
frames.each { |frame| self.send_frame_without_timeout(frame, false) }
signal_activity!
end
end # send_frameset_without_timeout(frames)
@@ -1175,29 +1178,21 @@
negotiate_value(@client_heartbeat, connection_tune.heartbeat)
end
@logger.debug { "Heartbeat interval negotiation: client = #{@client_heartbeat}, server = #{connection_tune.heartbeat}, result = #{@heartbeat}" }
@logger.info "Heartbeat interval used (in seconds): #{@heartbeat}"
- # We set the read_write_timeout to twice the heartbeat value
+ # We set the read_write_timeout to twice the heartbeat value,
+ # and then some padding for edge cases.
# This allows us to miss a single heartbeat before we time out the socket.
- #
- # Since RabbitMQ can be configured to disable heartbeats (bad idea but technically
- # possible nonetheless), we need to take both client and server values into
- # consideration when deciding about using the heartbeat value for read timeouts.
- @transport.read_timeout = if heartbeat_disabled?(@client_heartbeat) || heartbeat_disabled?(@heartbeat)
- @logger.debug { "Will use default socket read timeout of #{Transport::DEFAULT_READ_TIMEOUT}" }
- Transport::DEFAULT_READ_TIMEOUT
- else
- # pad to account for edge cases. MK.
- n = @heartbeat * 2.2
- @logger.debug { "Will use socket read timeout of #{n}" }
- n
- end
+ # If heartbeats are disabled, assume that TCP keepalives or a similar mechanism will be used
+ # and disable socket read timeouts. See ruby-amqp/bunny#551.
+ @transport.read_timeout = @heartbeat * 2.2
+ @logger.debug { "Will use socket read timeout of #{@transport.read_timeout}" }
-
# if there are existing channels we've just recovered from
# a network failure and need to fix the allocated set. See issue 205. MK.
if @channels.empty?
+ @logger.debug { "Initializing channel ID allocator with channel_max = #{@channel_max}" }
@channel_id_allocator = ChannelIdAllocator.new(@channel_max)
end
@transport.send_frame(AMQ::Protocol::Connection::TuneOk.encode(@channel_max, @frame_max, @heartbeat))
@logger.debug { "Sent connection.tune-ok with heartbeat interval = #{@heartbeat}, frame_max = #{@frame_max}, channel_max = #{@channel_max}" }