lib/bunny/session.rb in bunny-2.9.2 vs lib/bunny/session.rb in bunny-2.10.0

- old
+ new

@@ -34,14 +34,14 @@ DEFAULT_PASSWORD = "guest" # Default heartbeat interval, the same value as RabbitMQ 3.0 uses. DEFAULT_HEARTBEAT = :server # @private DEFAULT_FRAME_MAX = 131072 - # 2^16 - 1, maximum representable signed 16 bit integer. + # Hard limit the user cannot go over regardless of server configuration. # @private CHANNEL_MAX_LIMIT = 65535 - DEFAULT_CHANNEL_MAX = CHANNEL_MAX_LIMIT + DEFAULT_CHANNEL_MAX = 2047 # backwards compatibility # @private CONNECT_TIMEOUT = Transport::DEFAULT_CONNECTION_TIMEOUT @@ -99,21 +99,22 @@ # @option connection_string_or_opts [Array<String>] :addresses (["127.0.0.1:5672"]) list of addresses to select hostname and port from when connecting # @option connection_string_or_opts [Integer] :port (5672) Port RabbitMQ listens on # @option connection_string_or_opts [String] :username ("guest") Username # @option connection_string_or_opts [String] :password ("guest") Password # @option connection_string_or_opts [String] :vhost ("/") Virtual host to use - # @option connection_string_or_opts [Integer, Symbol] :heartbeat (:server) Heartbeat interval. :server means use the default suggested by RabbitMQ. 0 means no heartbeat (not recommended). + # @option connection_string_or_opts [Integer, Symbol] :heartbeat (:server) Heartbeat interval. :server means use the default suggested by RabbitMQ. 0 means heartbeats and socket read timeouts will be disabled (not recommended). # @option connection_string_or_opts [Integer] :network_recovery_interval (4) Recovery interval periodic network recovery will use. This includes initial pause after network failure. # @option connection_string_or_opts [Boolean] :tls (false) Should TLS/SSL be used? # @option connection_string_or_opts [String] :tls_cert (nil) Path to client TLS/SSL certificate file (.pem) # @option connection_string_or_opts [String] :tls_key (nil) Path to client TLS/SSL private key file (.pem) # @option connection_string_or_opts [Array<String>] :tls_ca_certificates Array of paths to TLS/SSL CA files (.pem), by default detected from OpenSSL configuration # @option connection_string_or_opts [String] :verify_peer (true) Whether TLS peer verification should be performed # @option connection_string_or_opts [Symbol] :tls_version (negotiated) What TLS version should be used (:TLSv1, :TLSv1_1, or :TLSv1_2) + # @option connection_string_or_opts [Integer] :channel_max (2047) Maximum number of channels allowed on this connection, minus 1 to account for the special channel 0. # @option connection_string_or_opts [Integer] :continuation_timeout (15000) Timeout for client operations that expect a response (e.g. {Bunny::Queue#get}), in milliseconds. # @option connection_string_or_opts [Integer] :connection_timeout (30) Timeout in seconds for connecting to the server. - # @option connection_string_or_opts [Integer] :read_timeout (30) TCP socket read timeout in seconds. + # @option connection_string_or_opts [Integer] :read_timeout (30) TCP socket read timeout in seconds. If heartbeats are disabled this will be ignored. # @option connection_string_or_opts [Integer] :write_timeout (30) TCP socket write timeout in seconds. # @option connection_string_or_opts [Proc] :hosts_shuffle_strategy A Proc that reorders a list of host strings, defaults to Array#shuffle # @option connection_string_or_opts [Logger] :logger The logger. If missing, one is created using :log_file and :log_level. # @option connection_string_or_opts [IO, String] :log_file The file or path to use when creating a logger. Defaults to STDOUT. # @option connection_string_or_opts [IO, String] :logfile DEPRECATED: use :log_file instead. The file or path to use when creating a logger. Defaults to STDOUT. @@ -1066,11 +1067,13 @@ def send_frameset(frames, channel) # some developers end up sharing channels between threads and when multiple # threads publish on the same channel aggressively, at some point frames will be # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception. # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained - # locking. Note that "single frame" methods do not need this kind of synchronization. MK. + # locking. Note that "single frame" methods technically do not need this kind of synchronization + # (no incorrect frame interleaving of the same kind as with basic.publish isn't possible) but we + # still recommend not sharing channels between threads except for consumer-only cases in the docs. MK. channel.synchronize do # see rabbitmq/rabbitmq-server#156 data = frames.reduce("") { |acc, frame| acc << frame.encode } @transport.write(data) signal_activity! @@ -1085,11 +1088,11 @@ def send_frameset_without_timeout(frames, channel) # some developers end up sharing channels between threads and when multiple # threads publish on the same channel aggressively, at some point frames will be # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception. # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained - # locking. Note that "single frame" methods do not need this kind of synchronization. MK. + # locking. See a note about "single frame" methods in a comment in `send_frameset`. MK. channel.synchronize do frames.each { |frame| self.send_frame_without_timeout(frame, false) } signal_activity! end end # send_frameset_without_timeout(frames) @@ -1175,29 +1178,21 @@ negotiate_value(@client_heartbeat, connection_tune.heartbeat) end @logger.debug { "Heartbeat interval negotiation: client = #{@client_heartbeat}, server = #{connection_tune.heartbeat}, result = #{@heartbeat}" } @logger.info "Heartbeat interval used (in seconds): #{@heartbeat}" - # We set the read_write_timeout to twice the heartbeat value + # We set the read_write_timeout to twice the heartbeat value, + # and then some padding for edge cases. # This allows us to miss a single heartbeat before we time out the socket. - # - # Since RabbitMQ can be configured to disable heartbeats (bad idea but technically - # possible nonetheless), we need to take both client and server values into - # consideration when deciding about using the heartbeat value for read timeouts. - @transport.read_timeout = if heartbeat_disabled?(@client_heartbeat) || heartbeat_disabled?(@heartbeat) - @logger.debug { "Will use default socket read timeout of #{Transport::DEFAULT_READ_TIMEOUT}" } - Transport::DEFAULT_READ_TIMEOUT - else - # pad to account for edge cases. MK. - n = @heartbeat * 2.2 - @logger.debug { "Will use socket read timeout of #{n}" } - n - end + # If heartbeats are disabled, assume that TCP keepalives or a similar mechanism will be used + # and disable socket read timeouts. See ruby-amqp/bunny#551. + @transport.read_timeout = @heartbeat * 2.2 + @logger.debug { "Will use socket read timeout of #{@transport.read_timeout}" } - # if there are existing channels we've just recovered from # a network failure and need to fix the allocated set. See issue 205. MK. if @channels.empty? + @logger.debug { "Initializing channel ID allocator with channel_max = #{@channel_max}" } @channel_id_allocator = ChannelIdAllocator.new(@channel_max) end @transport.send_frame(AMQ::Protocol::Connection::TuneOk.encode(@channel_max, @frame_max, @heartbeat)) @logger.debug { "Sent connection.tune-ok with heartbeat interval = #{@heartbeat}, frame_max = #{@frame_max}, channel_max = #{@channel_max}" }