lib/bunny/session.rb in bunny-2.2.2 vs lib/bunny/session.rb in bunny-2.3.0

- old
+ new

@@ -84,12 +84,13 @@ # Authentication mechanism, e.g. "PLAIN" or "EXTERNAL" # @return [String] attr_reader :mechanism # @return [Logger] attr_reader :logger - # @return [Integer] Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds. Default is 4000. + # @return [Integer] Timeout for blocking protocol operations (queue.declare, queue.bind, etc), in milliseconds. Default is 15000. attr_reader :continuation_timeout + attr_reader :network_recovery_interval # @param [String, Hash] connection_string_or_opts Connection string or a hash of connection options # @param [Hash] optz Extra options not related to connection # @@ -105,11 +106,11 @@ # @option connection_string_or_opts [Boolean] :tls (false) Should TLS/SSL be used? # @option connection_string_or_opts [String] :tls_cert (nil) Path to client TLS/SSL certificate file (.pem) # @option connection_string_or_opts [String] :tls_key (nil) Path to client TLS/SSL private key file (.pem) # @option connection_string_or_opts [Array<String>] :tls_ca_certificates Array of paths to TLS/SSL CA files (.pem), by default detected from OpenSSL configuration # @option connection_string_or_opts [String] :verify_peer (true) Whether TLS peer verification should be performed - # @option connection_string_or_opts [Integer] :continuation_timeout (4000) Timeout for client operations that expect a response (e.g. {Bunny::Queue#get}), in milliseconds. + # @option connection_string_or_opts [Integer] :continuation_timeout (15000) Timeout for client operations that expect a response (e.g. {Bunny::Queue#get}), in milliseconds. # @option connection_string_or_opts [Integer] :connection_timeout (5) Timeout in seconds for connecting to the server. # @option connection_string_or_opts [Proc] :hosts_shuffle_strategy A Proc that reorders a list of host strings, defaults to Array#shuffle # @option connection_string_or_opts [Logger] :logger The logger. If missing, one is created using :log_file and :log_level. # @option connection_string_or_opts [IO, String] :log_file The file or path to use when creating a logger. Defaults to STDOUT. # @option connection_string_or_opts [IO, String] :logfile DEPRECATED: use :log_file instead. The file or path to use when creating a logger. Defaults to STDOUT. @@ -328,18 +329,18 @@ # Opens a new channel and returns it. This method will block the calling # thread until the response is received and the channel is guaranteed to be # opened (this operation is very fast and inexpensive). # # @return [Bunny::Channel] Newly opened channel - def create_channel(n = nil, consumer_pool_size = 1) + def create_channel(n = nil, consumer_pool_size = 1, consumer_pool_abort_on_exception = false) raise ArgumentError, "channel number 0 is reserved in the protocol and cannot be used" if 0 == n @channel_mutex.synchronize do if n && (ch = @channels[n]) ch else - ch = Bunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1)) + ch = Bunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1, consumer_pool_abort_on_exception)) ch.open ch end end end @@ -545,11 +546,11 @@ # Connection level errors result in exceptions being raised. # Deliveries and other methods are passed on to channels to dispatch. # # @private def handle_frame(ch_number, method) - @logger.debug "Session#handle_frame on #{ch_number}: #{method.inspect}" + @logger.debug { "Session#handle_frame on #{ch_number}: #{method.inspect}" } case method when AMQ::Protocol::Channel::OpenOk then @continuations.push(method) when AMQ::Protocol::Channel::CloseOk then @continuations.push(method) @@ -634,10 +635,11 @@ if recoverable_network_failure?(exception) @logger.warn "Recovering from a network failure..." @channels.each do |n, ch| ch.maybe_kill_consumer_work_pool! end + @reader_loop.stop if @reader_loop maybe_shutdown_heartbeat_sender recover_from_network_failure else # TODO: investigate if we can be a bit smarter here. MK. @@ -693,13 +695,10 @@ @recovery_attempts.nil? || @recovery_attempts > 1 end # @private def recover_channels - # default channel is reopened right after connection - # negotiation is completed, so make sure we do not try to open - # it twice. MK. @channels.each do |n, ch| ch.open ch.recover_from_network_failure end @@ -932,11 +931,11 @@ else raise ConnectionClosedError.new(frame) end end - # Sends multiple frames, one by one. For thread safety this method takes a channel + # Sends multiple frames, in one go. For thread safety this method takes a channel # object and synchronizes on it. # # @private def send_frameset(frames, channel) # some developers end up sharing channels between threads and when multiple @@ -1047,11 +1046,11 @@ @heartbeat = if heartbeat_disabled?(@client_heartbeat) 0 else negotiate_value(@client_heartbeat, connection_tune.heartbeat) end - @logger.debug "Heartbeat interval negotiation: client = #{@client_heartbeat}, server = #{connection_tune.heartbeat}, result = #{@heartbeat}" + @logger.debug { "Heartbeat interval negotiation: client = #{@client_heartbeat}, server = #{connection_tune.heartbeat}, result = #{@heartbeat}" } @logger.info "Heartbeat interval used (in seconds): #{@heartbeat}" # We set the read_write_timeout to twice the heartbeat value # This allows us to miss a single heartbeat before we time out the socket. @transport.read_timeout = if heartbeat_disabled?(@client_heartbeat) @@ -1067,12 +1066,12 @@ if @channels.empty? @channel_id_allocator = ChannelIdAllocator.new(@channel_max) end @transport.send_frame(AMQ::Protocol::Connection::TuneOk.encode(@channel_max, @frame_max, @heartbeat)) - @logger.debug "Sent connection.tune-ok with heartbeat interval = #{@heartbeat}, frame_max = #{@frame_max}, channel_max = #{@channel_max}" + @logger.debug { "Sent connection.tune-ok with heartbeat interval = #{@heartbeat}, frame_max = #{@frame_max}, channel_max = #{@channel_max}" } @transport.send_frame(AMQ::Protocol::Connection::Open.encode(self.vhost)) - @logger.debug "Sent connection.open with vhost = #{self.vhost}" + @logger.debug { "Sent connection.open with vhost = #{self.vhost}" } frame2 = begin fr = @transport.read_next_frame while fr.is_a?(AMQ::Protocol::HeartbeatFrame) fr = @transport.read_next_frame