lib/bunny/session.rb in bunny-1.0.7 vs lib/bunny/session.rb in bunny-1.1.0.pre1
- old
+ new
@@ -34,14 +34,10 @@
DEFAULT_PASSWORD = "guest"
# Default heartbeat interval, the same value as RabbitMQ 3.0 uses.
DEFAULT_HEARTBEAT = :server
# @private
DEFAULT_FRAME_MAX = 131072
- # 2^16 - 1, maximum representable signed 16 bit integer.
- # @private
- CHANNEL_MAX_LIMIT = 65535
- DEFAULT_CHANNEL_MAX = CHANNEL_MAX_LIMIT
# backwards compatibility
# @private
CONNECT_TIMEOUT = Transport::DEFAULT_CONNECTION_TIMEOUT
@@ -80,11 +76,11 @@
# API
#
# @return [Bunny::Transport]
attr_reader :transport
- attr_reader :status, :host, :port, :heartbeat, :user, :pass, :vhost, :frame_max, :channel_max, :threaded
+ attr_reader :status, :host, :port, :heartbeat, :user, :pass, :vhost, :frame_max, :threaded
attr_reader :server_capabilities, :server_properties, :server_authentication_mechanisms, :server_locales
attr_reader :default_channel
attr_reader :channel_id_allocator
# Authentication mechanism, e.g. "PLAIN" or "EXTERNAL"
# @return [String]
@@ -108,10 +104,11 @@
# @option connection_string_or_opts [Boolean] :tls (false) Should TLS/SSL be used?
# @option connection_string_or_opts [String] :tls_cert (nil) Path to client TLS/SSL certificate file (.pem)
# @option connection_string_or_opts [String] :tls_key (nil) Path to client TLS/SSL private key file (.pem)
# @option connection_string_or_opts [Array<String>] :tls_ca_certificates Array of paths to TLS/SSL CA files (.pem), by default detected from OpenSSL configuration
# @option connection_string_or_opts [Integer] :continuation_timeout (4000) Timeout for client operations that expect a response (e.g. {Bunny::Queue#get}), in milliseconds.
+ # @option connection_string_or_opts [Integer] :connection_timeout (5) Timeout in seconds for connecting to the server.
#
# @option optz [String] :auth_mechanism ("PLAIN") Authentication mechanism, PLAIN or EXTERNAL
# @option optz [String] :locale ("PLAIN") Locale RabbitMQ should use
#
# @see http://rubybunny.info/articles/connecting.html Connecting to RabbitMQ guide
@@ -134,11 +131,11 @@
@pass = self.password_from(opts)
@vhost = self.vhost_from(opts)
@logfile = opts[:log_file] || opts[:logfile] || STDOUT
@threaded = opts.fetch(:threaded, true)
- self.init_logger(opts[:log_level] || ENV["BUNNY_LOG_LEVEL"] || Logger::WARN)
+ @logger = opts.fetch(:logger, init_logger(opts[:log_level] || ENV["BUNNY_LOG_LEVEL"] || Logger::WARN))
# should automatic recovery from network failures be used?
@automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
true
else
@@ -151,13 +148,11 @@
@status = :not_connected
@blocked = false
# these are negotiated with the broker during the connection tuning phase
@client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX)
- @client_channel_max = normalize_client_channel_max(opts.fetch(:channel_max, DEFAULT_CHANNEL_MAX))
- # will be-renegotiated during connection tuning steps. MK.
- @channel_max = @client_channel_max
+ @client_channel_max = opts.fetch(:channel_max, 65536)
@client_heartbeat = self.heartbeat_from(opts)
@client_properties = opts[:properties] || DEFAULT_CLIENT_PROPERTIES
@mechanism = opts.fetch(:auth_mechanism, "PLAIN")
@credentials_encoder = credentials_encoder_for(@mechanism)
@@ -270,12 +265,10 @@
# thread until the response is received and the channel is guaranteed to be
# opened (this operation is very fast and inexpensive).
#
# @return [Bunny::Channel] Newly opened channel
def create_channel(n = nil, consumer_pool_size = 1)
- raise ArgumentError, "channel number 0 is reserved in the protocol and cannot be used" if 0 == n
-
if n && (ch = @channels[n])
ch
else
ch = Bunny::Channel.new(self, n, ConsumerWorkPool.new(consumer_pool_size || 1))
ch.open
@@ -305,15 +298,12 @@
# method and closes it.
#
# @return [Bunny::Session] self
def with_channel(n = nil)
ch = create_channel(n)
- begin
- yield ch
- ensure
- ch.close if ch.open?
- end
+ yield ch
+ ch.close if ch.open?
self
end
# @return [Boolean] true if this connection is still not fully open
@@ -655,12 +645,10 @@
ChannelError
when 505 then
UnexpectedFrame
when 506 then
ResourceError
- when 530 then
- NotAllowedError
when 541 then
InternalError
else
raise "Unknown reply code: #{frame.reply_code}, text: #{frame.reply_text}"
end
@@ -948,29 +936,11 @@
@status = :open
if @heartbeat && @heartbeat > 0
initialize_heartbeat_sender
end
- unless connection_open_ok.is_a?(AMQ::Protocol::Connection::OpenOk)
- if connection_open_ok.is_a?(AMQ::Protocol::Connection::Close)
- e = instantiate_connection_level_exception(connection_open_ok)
- begin
- shut_down_all_consumer_work_pools!
- maybe_shutdown_reader_loop
- rescue ShutdownSignal => sse
- # no-op
- rescue Exception => e
- @logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.message}"
- ensure
- close_transport
- end
-
- @origin_thread.raise(e)
- else
- raise "could not open connection: server did not respond with connection.open-ok but #{connection_open_ok.inspect} instead"
- end
- end
+ raise "could not open connection: server did not respond with connection.open-ok" unless connection_open_ok.is_a?(AMQ::Protocol::Connection::OpenOk)
end
def heartbeat_disabled?(val)
0 == val || val.nil?
end
@@ -986,11 +956,10 @@
end
end
# @private
def initialize_heartbeat_sender
- maybe_shutdown_heartbeat_sender
@logger.debug "Initializing heartbeat sender..."
@heartbeat_sender = HeartbeatSender.new(@transport, @logger)
@heartbeat_sender.start(@heartbeat)
end
@@ -1048,15 +1017,15 @@
@continuations.poll(@continuation_timeout)
end
# @private
def init_logger(level)
- @logger = ::Logger.new(@logfile)
- @logger.level = normalize_log_level(level)
- @logger.progname = self.to_s
+ lgr = ::Logger.new(@logfile)
+ lgr.level = normalize_log_level(level)
+ lgr.progname = self.to_s
- @logger
+ lgr
end
# @private
def normalize_log_level(level)
case level
@@ -1065,27 +1034,9 @@
when :warn, Logger::WARN, "warn" then Logger::WARN
when :error, Logger::ERROR, "error" then Logger::ERROR
when :fatal, Logger::FATAL, "fatal" then Logger::FATAL
else
Logger::WARN
- end
- end
-
- # @private
- def shut_down_all_consumer_work_pools!
- @channels.each do |_, ch|
- ch.maybe_kill_consumer_work_pool!
- end
- end
-
- def normalize_client_channel_max(n)
- return CHANNEL_MAX_LIMIT if n > CHANNEL_MAX_LIMIT
-
- case n
- when 0 then
- CHANNEL_MAX_LIMIT
- else
- n
end
end
end # Session
# backwards compatibility