lib/bunny/session.rb in bunny-2.2.2 vs lib/bunny/session.rb in bunny-2.3.0
- old
+ new
@@ -84,12 +84,13 @@
# Authentication mechanism, e.g. "PLAIN" or "EXTERNAL"
# @return [String]
attr_reader :mechanism
# @return [Logger]
attr_reader :logger
- # @return [Integer] Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds. Default is 4000.
+ # @return [Integer] Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds. Default is 15000.
attr_reader :continuation_timeout
+ attr_reader :network_recovery_interval
# @param [String, Hash] connection_string_or_opts Connection string or a hash of connection options
# @param [Hash] optz Extra options not related to connection
#
@@ -105,11 +106,11 @@
# @option connection_string_or_opts [Boolean] :tls (false) Should TLS/SSL be used?
# @option connection_string_or_opts [String] :tls_cert (nil) Path to client TLS/SSL certificate file (.pem)
# @option connection_string_or_opts [String] :tls_key (nil) Path to client TLS/SSL private key file (.pem)
# @option connection_string_or_opts [Array<String>] :tls_ca_certificates Array of paths to TLS/SSL CA files (.pem), by default detected from OpenSSL configuration
# @option connection_string_or_opts [String] :verify_peer (true) Whether TLS peer verification should be performed
- # @option connection_string_or_opts [Integer] :continuation_timeout (4000) Timeout for client operations that expect a response (e.g. {Bunny::Queue#get}), in milliseconds.
+ # @option connection_string_or_opts [Integer] :continuation_timeout (15000) Timeout for client operations that expect a response (e.g. {Bunny::Queue#get}), in milliseconds.
# @option connection_string_or_opts [Integer] :connection_timeout (5) Timeout in seconds for connecting to the server.
# @option connection_string_or_opts [Proc] :hosts_shuffle_strategy A Proc that reorders a list of host strings, defaults to Array#shuffle
# @option connection_string_or_opts [Logger] :logger The logger. If missing, one is created using :log_file and :log_level.
# @option connection_string_or_opts [IO, String] :log_file The file or path to use when creating a logger. Defaults to STDOUT.
# @option connection_string_or_opts [IO, String] :logfile DEPRECATED: use :log_file instead. The file or path to use when creating a logger. Defaults to STDOUT.
@@ -328,18 +329,18 @@
# Opens a new channel and returns it. This method will block the calling
# thread until the response is received and the channel is guaranteed to be
# opened (this operation is very fast and inexpensive).
#
# @return [Bunny::Channel] Newly opened channel
- def create_channel(n = nil, consumer_pool_size = 1)
+ def create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false)
raise ArgumentError, "channel number 0 is reserved in the protocol and cannot be used" if 0 == n
@channel_mutex.synchronize do
if n && (ch = @channels[n])
ch
else
- ch = Bunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1))
+ ch = Bunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1, consumer_pool_abort_on_exception))
ch.open
ch
end
end
end
@@ -545,11 +546,11 @@
# Connection level errors result in exceptions being raised.
# Deliveries and other methods are passed on to channels to dispatch.
#
# @private
def handle_frame(ch_number, method)
- @logger.debug "Session#handle_frame on #{ch_number}: #{method.inspect}"
+ @logger.debug { "Session#handle_frame on #{ch_number}: #{method.inspect}" }
case method
when AMQ::Protocol::Channel::OpenOk then
@continuations.push(method)
when AMQ::Protocol::Channel::CloseOk then
@continuations.push(method)
@@ -634,10 +635,11 @@
if recoverable_network_failure?(exception)
@logger.warn "Recovering from a network failure..."
@channels.each do |n, ch|
ch.maybe_kill_consumer_work_pool!
end
+ @reader_loop.stop if @reader_loop
maybe_shutdown_heartbeat_sender
recover_from_network_failure
else
# TODO: investigate if we can be a bit smarter here. MK.
@@ -693,13 +695,10 @@
@recovery_attempts.nil? || @recovery_attempts > 1
end
# @private
def recover_channels
- # default channel is reopened right after connection
- # negotiation is completed, so make sure we do not try to open
- # it twice. MK.
@channels.each do |n, ch|
ch.open
ch.recover_from_network_failure
end
@@ -932,11 +931,11 @@
else
raise ConnectionClosedError.new(frame)
end
end
- # Sends multiple frames, one by one. For thread safety this method takes a channel
+ # Sends multiple frames, in one go. For thread safety this method takes a channel
# object and synchronizes on it.
#
# @private
def send_frameset(frames, channel)
# some developers end up sharing channels between threads and when multiple
@@ -1047,11 +1046,11 @@
@heartbeat = if heartbeat_disabled?(@client_heartbeat)
0
else
negotiate_value(@client_heartbeat, connection_tune.heartbeat)
end
- @logger.debug "Heartbeat interval negotiation: client = #{@client_heartbeat}, server = #{connection_tune.heartbeat}, result = #{@heartbeat}"
+ @logger.debug { "Heartbeat interval negotiation: client = #{@client_heartbeat}, server = #{connection_tune.heartbeat}, result = #{@heartbeat}" }
@logger.info "Heartbeat interval used (in seconds): #{@heartbeat}"
# We set the read_write_timeout to twice the heartbeat value
# This allows us to miss a single heartbeat before we time out the socket.
@transport.read_timeout = if heartbeat_disabled?(@client_heartbeat)
@@ -1067,12 +1066,12 @@
if @channels.empty?
@channel_id_allocator = ChannelIdAllocator.new(@channel_max)
end
@transport.send_frame(AMQ::Protocol::Connection::TuneOk.encode(@channel_max, @frame_max, @heartbeat))
- @logger.debug "Sent connection.tune-ok with heartbeat interval = #{@heartbeat}, frame_max = #{@frame_max}, channel_max = #{@channel_max}"
+ @logger.debug { "Sent connection.tune-ok with heartbeat interval = #{@heartbeat}, frame_max = #{@frame_max}, channel_max = #{@channel_max}" }
@transport.send_frame(AMQ::Protocol::Connection::Open.encode(self.vhost))
- @logger.debug "Sent connection.open with vhost = #{self.vhost}"
+ @logger.debug { "Sent connection.open with vhost = #{self.vhost}" }
frame2 = begin
fr = @transport.read_next_frame
while fr.is_a?(AMQ::Protocol::HeartbeatFrame)
fr = @transport.read_next_frame