lib/bunny/session.rb in bunny-1.0.7 vs lib/bunny/session.rb in bunny-1.1.0.pre1

- old
+ new

@@ -34,14 +34,10 @@ DEFAULT_PASSWORD = "guest" # Default heartbeat interval, the same value as RabbitMQ 3.0 uses. DEFAULT_HEARTBEAT = :server # @private DEFAULT_FRAME_MAX = 131072 - # 2^16 - 1, maximum representable signed 16 bit integer. - # @private - CHANNEL_MAX_LIMIT = 65535 - DEFAULT_CHANNEL_MAX = CHANNEL_MAX_LIMIT # backwards compatibility # @private CONNECT_TIMEOUT = Transport::DEFAULT_CONNECTION_TIMEOUT @@ -80,11 +76,11 @@ # API # # @return [Bunny::Transport] attr_reader :transport - attr_reader :status, :host, :port, :heartbeat, :user, :pass, :vhost, :frame_max, :channel_max, :threaded + attr_reader :status, :host, :port, :heartbeat, :user, :pass, :vhost, :frame_max, :threaded attr_reader :server_capabilities, :server_properties, :server_authentication_mechanisms, :server_locales attr_reader :default_channel attr_reader :channel_id_allocator # Authentication mechanism, e.g. "PLAIN" or "EXTERNAL" # @return [String] @@ -108,10 +104,11 @@ # @option connection_string_or_opts [Boolean] :tls (false) Should TLS/SSL be used? # @option connection_string_or_opts [String] :tls_cert (nil) Path to client TLS/SSL certificate file (.pem) # @option connection_string_or_opts [String] :tls_key (nil) Path to client TLS/SSL private key file (.pem) # @option connection_string_or_opts [Array<String>] :tls_ca_certificates Array of paths to TLS/SSL CA files (.pem), by default detected from OpenSSL configuration # @option connection_string_or_opts [Integer] :continuation_timeout (4000) Timeout for client operations that expect a response (e.g. {Bunny::Queue#get}), in milliseconds. + # @option connection_string_or_opts [Integer] :connection_timeout (5) Timeout in seconds for connecting to the server. # # @option optz [String] :auth_mechanism ("PLAIN") Authentication mechanism, PLAIN or EXTERNAL # @option optz [String] :locale ("PLAIN") Locale RabbitMQ should use # # @see http://rubybunny.info/articles/connecting.html Connecting to RabbitMQ guide @@ -134,11 +131,11 @@ @pass = self.password_from(opts) @vhost = self.vhost_from(opts) @logfile = opts[:log_file] || opts[:logfile] || STDOUT @threaded = opts.fetch(:threaded, true) - self.init_logger(opts[:log_level] || ENV["BUNNY_LOG_LEVEL"] || Logger::WARN) + @logger = opts.fetch(:logger, init_logger(opts[:log_level] || ENV["BUNNY_LOG_LEVEL"] || Logger::WARN)) # should automatic recovery from network failures be used? @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil? true else @@ -151,13 +148,11 @@ @status = :not_connected @blocked = false # these are negotiated with the broker during the connection tuning phase @client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX) - @client_channel_max = normalize_client_channel_max(opts.fetch(:channel_max, DEFAULT_CHANNEL_MAX)) - # will be-renegotiated during connection tuning steps. MK. - @channel_max = @client_channel_max + @client_channel_max = opts.fetch(:channel_max, 65536) @client_heartbeat = self.heartbeat_from(opts) @client_properties = opts[:properties] || DEFAULT_CLIENT_PROPERTIES @mechanism = opts.fetch(:auth_mechanism, "PLAIN") @credentials_encoder = credentials_encoder_for(@mechanism) @@ -270,12 +265,10 @@ # thread until the response is received and the channel is guaranteed to be # opened (this operation is very fast and inexpensive). # # @return [Bunny::Channel] Newly opened channel def create_channel(n = nil, consumer_pool_size = 1) - raise ArgumentError, "channel number 0 is reserved in the protocol and cannot be used" if 0 == n - if n && (ch = @channels[n]) ch else ch = Bunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1)) ch.open @@ -305,15 +298,12 @@ # method and closes it. # # @return [Bunny::Session] self def with_channel(n = nil) ch = create_channel(n) - begin - yield ch - ensure - ch.close if ch.open? - end + yield ch + ch.close if ch.open? self end # @return [Boolean] true if this connection is still not fully open @@ -655,12 +645,10 @@ ChannelError when 505 then UnexpectedFrame when 506 then ResourceError - when 530 then - NotAllowedError when 541 then InternalError else raise "Unknown reply code: #{frame.reply_code}, text: #{frame.reply_text}" end @@ -948,29 +936,11 @@ @status = :open if @heartbeat && @heartbeat > 0 initialize_heartbeat_sender end - unless connection_open_ok.is_a?(AMQ::Protocol::Connection::OpenOk) - if connection_open_ok.is_a?(AMQ::Protocol::Connection::Close) - e = instantiate_connection_level_exception(connection_open_ok) - begin - shut_down_all_consumer_work_pools! - maybe_shutdown_reader_loop - rescue ShutdownSignal => sse - # no-op - rescue Exception => e - @logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.message}" - ensure - close_transport - end - - @origin_thread.raise(e) - else - raise "could not open connection: server did not respond with connection.open-ok but #{connection_open_ok.inspect} instead" - end - end + raise "could not open connection: server did not respond with connection.open-ok" unless connection_open_ok.is_a?(AMQ::Protocol::Connection::OpenOk) end def heartbeat_disabled?(val) 0 == val || val.nil? end @@ -986,11 +956,10 @@ end end # @private def initialize_heartbeat_sender - maybe_shutdown_heartbeat_sender @logger.debug "Initializing heartbeat sender..." @heartbeat_sender = HeartbeatSender.new(@transport, @logger) @heartbeat_sender.start(@heartbeat) end @@ -1048,15 +1017,15 @@ @continuations.poll(@continuation_timeout) end # @private def init_logger(level) - @logger = ::Logger.new(@logfile) - @logger.level = normalize_log_level(level) - @logger.progname = self.to_s + lgr = ::Logger.new(@logfile) + lgr.level = normalize_log_level(level) + lgr.progname = self.to_s - @logger + lgr end # @private def normalize_log_level(level) case level @@ -1065,27 +1034,9 @@ when :warn, Logger::WARN, "warn" then Logger::WARN when :error, Logger::ERROR, "error" then Logger::ERROR when :fatal, Logger::FATAL, "fatal" then Logger::FATAL else Logger::WARN - end - end - - # @private - def shut_down_all_consumer_work_pools! - @channels.each do |_, ch| - ch.maybe_kill_consumer_work_pool! - end - end - - def normalize_client_channel_max(n) - return CHANNEL_MAX_LIMIT if n > CHANNEL_MAX_LIMIT - - case n - when 0 then - CHANNEL_MAX_LIMIT - else - n end end end # Session # backwards compatibility