lib/bunny/session.rb in bunny-1.0.4 vs lib/bunny/session.rb in bunny-1.0.5
- old
+ new
@@ -34,10 +34,14 @@
DEFAULT_PASSWORD = "guest"
# Default heartbeat interval, the same value as RabbitMQ 3.0 uses.
DEFAULT_HEARTBEAT = :server
# @private
DEFAULT_FRAME_MAX = 131072
+ # 2^16 - 1, maximum representable signed 16 bit integer.
+ # @private
+ CHANNEL_MAX_LIMIT = 65535
+ DEFAULT_CHANNEL_MAX = CHANNEL_MAX_LIMIT
# backwards compatibility
# @private
CONNECT_TIMEOUT = Transport::DEFAULT_CONNECTION_TIMEOUT
@@ -76,11 +80,11 @@
# API
#
# @return [Bunny::Transport]
attr_reader :transport
- attr_reader :status, :host, :port, :heartbeat, :user, :pass, :vhost, :frame_max, :threaded
+ attr_reader :status, :host, :port, :heartbeat, :user, :pass, :vhost, :frame_max, :channel_max, :threaded
attr_reader :server_capabilities, :server_properties, :server_authentication_mechanisms, :server_locales
attr_reader :default_channel
attr_reader :channel_id_allocator
# Authentication mechanism, e.g. "PLAIN" or "EXTERNAL"
# @return [String]
@@ -147,11 +151,13 @@
@status = :not_connected
@blocked = false
# these are negotiated with the broker during the connection tuning phase
@client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX)
- @client_channel_max = opts.fetch(:channel_max, 65536)
+ @client_channel_max = normalize_client_channel_max(opts.fetch(:channel_max, DEFAULT_CHANNEL_MAX))
+ # will be-renegotiated during connection tuning steps. MK.
+ @channel_max = @client_channel_max
@client_heartbeat = self.heartbeat_from(opts)
@client_properties = opts[:properties] || DEFAULT_CLIENT_PROPERTIES
@mechanism = opts.fetch(:auth_mechanism, "PLAIN")
@credentials_encoder = credentials_encoder_for(@mechanism)
@@ -644,10 +650,12 @@
ChannelError
when 505 then
UnexpectedFrame
when 506 then
ResourceError
+ when 530 then
+ NotAllowedError
when 541 then
InternalError
else
raise "Unknown reply code: #{frame.reply_code}, text: #{frame.reply_text}"
end
@@ -935,11 +943,29 @@
@status = :open
if @heartbeat && @heartbeat > 0
initialize_heartbeat_sender
end
- raise "could not open connection: server did not respond with connection.open-ok" unless connection_open_ok.is_a?(AMQ::Protocol::Connection::OpenOk)
+ unless connection_open_ok.is_a?(AMQ::Protocol::Connection::OpenOk)
+ if connection_open_ok.is_a?(AMQ::Protocol::Connection::Close)
+ e = instantiate_connection_level_exception(connection_open_ok)
+ begin
+ shut_down_all_consumer_work_pools!
+ maybe_shutdown_reader_loop
+ rescue ShutdownSignal => sse
+ # no-op
+ rescue Exception => e
+ @logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.message}"
+ ensure
+ close_transport
+ end
+
+ @origin_thread.raise(e)
+ else
+ raise "could not open connection: server did not respond with connection.open-ok but #{connection_open_ok.inspect} instead"
+ end
+ end
end
def heartbeat_disabled?(val)
0 == val || val.nil?
end
@@ -955,10 +981,11 @@
end
end
# @private
def initialize_heartbeat_sender
+ maybe_shutdown_heartbeat_sender
@logger.debug "Initializing heartbeat sender..."
@heartbeat_sender = HeartbeatSender.new(@transport, @logger)
@heartbeat_sender.start(@heartbeat)
end
@@ -1033,9 +1060,27 @@
when :warn, Logger::WARN, "warn" then Logger::WARN
when :error, Logger::ERROR, "error" then Logger::ERROR
when :fatal, Logger::FATAL, "fatal" then Logger::FATAL
else
Logger::WARN
+ end
+ end
+
+ # @private
+ def shut_down_all_consumer_work_pools!
+ @channels.each do |_, ch|
+ ch.maybe_kill_consumer_work_pool!
+ end
+ end
+
+ def normalize_client_channel_max(n)
+ return CHANNEL_MAX_LIMIT if n > CHANNEL_MAX_LIMIT
+
+ case n
+ when 0 then
+ CHANNEL_MAX_LIMIT
+ else
+ n
end
end
end # Session
# backwards compatibility