lib/bunny/session.rb in bunny-1.1.0.pre1 vs lib/bunny/session.rb in bunny-1.1.0.pre2
- old
+ new
@@ -34,10 +34,14 @@
DEFAULT_PASSWORD = "guest"
# Default heartbeat interval, the same value as RabbitMQ 3.0 uses.
DEFAULT_HEARTBEAT = :server
# @private
DEFAULT_FRAME_MAX = 131072
+ # 2^16 - 1, maximum representable signed 16 bit integer.
+ # @private
+ CHANNEL_MAX_LIMIT = 65535
+ DEFAULT_CHANNEL_MAX = CHANNEL_MAX_LIMIT
# backwards compatibility
# @private
CONNECT_TIMEOUT = Transport::DEFAULT_CONNECTION_TIMEOUT
@@ -76,11 +80,11 @@
# API
#
# @return [Bunny::Transport]
attr_reader :transport
- attr_reader :status, :host, :port, :heartbeat, :user, :pass, :vhost, :frame_max, :threaded
+ attr_reader :status, :host, :port, :heartbeat, :user, :pass, :vhost, :frame_max, :channel_max, :threaded
attr_reader :server_capabilities, :server_properties, :server_authentication_mechanisms, :server_locales
attr_reader :default_channel
attr_reader :channel_id_allocator
# Authentication mechanism, e.g. "PLAIN" or "EXTERNAL"
# @return [String]
@@ -148,11 +152,13 @@
@status = :not_connected
@blocked = false
# these are negotiated with the broker during the connection tuning phase
@client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX)
- @client_channel_max = opts.fetch(:channel_max, 65536)
+ @client_channel_max = normalize_client_channel_max(opts.fetch(:channel_max, DEFAULT_CHANNEL_MAX))
+ # will be-renegotiated during connection tuning steps. MK.
+ @channel_max = @client_channel_max
@client_heartbeat = self.heartbeat_from(opts)
@client_properties = opts[:properties] || DEFAULT_CLIENT_PROPERTIES
@mechanism = opts.fetch(:auth_mechanism, "PLAIN")
@credentials_encoder = credentials_encoder_for(@mechanism)
@@ -286,13 +292,14 @@
self.close_connection(true)
end
maybe_shutdown_reader_loop
close_transport
-
- @status = :closed
end
+
+ shut_down_all_consumer_work_pools!
+ @status = :closed
end
alias stop close
# Creates a temporary channel, yields it to the block given to this
# method and closes it.
@@ -478,17 +485,18 @@
# @private
def close_connection(sync = true)
if @transport.open?
@transport.send_frame(AMQ::Protocol::Connection::Close.encode(200, "Goodbye", 0, 0))
- maybe_shutdown_heartbeat_sender
- @status = :not_connected
-
if sync
@last_connection_close_ok = wait_on_continuations
end
end
+
+ shut_down_all_consumer_work_pools!
+ maybe_shutdown_heartbeat_sender
+ @status = :not_connected
end
# Handles incoming frames and dispatches them.
#
# Channel methods (`channel.open-ok`, `channel.close-ok`) are
@@ -506,10 +514,21 @@
@continuations.push(method)
when AMQ::Protocol::Connection::Close then
@last_connection_error = instantiate_connection_level_exception(method)
@continuations.push(method)
+ begin
+ shut_down_all_consumer_work_pools!
+ maybe_shutdown_reader_loop
+ rescue ShutdownSignal => sse
+ # no-op
+ rescue Exception => e
+ @logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.message}"
+ ensure
+ close_transport
+ end
+
@origin_thread.raise(@last_connection_error)
when AMQ::Protocol::Connection::CloseOk then
@last_connection_close_ok = method
begin
@continuations.clear
@@ -645,10 +664,12 @@
ChannelError
when 505 then
UnexpectedFrame
when 506 then
ResourceError
+ when 530 then
+ NotAllowedError
when 541 then
InternalError
else
raise "Unknown reply code: #{frame.reply_code}, text: #{frame.reply_text}"
end
@@ -936,11 +957,29 @@
@status = :open
if @heartbeat && @heartbeat > 0
initialize_heartbeat_sender
end
- raise "could not open connection: server did not respond with connection.open-ok" unless connection_open_ok.is_a?(AMQ::Protocol::Connection::OpenOk)
+ unless connection_open_ok.is_a?(AMQ::Protocol::Connection::OpenOk)
+ if connection_open_ok.is_a?(AMQ::Protocol::Connection::Close)
+ e = instantiate_connection_level_exception(connection_open_ok)
+ begin
+ shut_down_all_consumer_work_pools!
+ maybe_shutdown_reader_loop
+ rescue ShutdownSignal => sse
+ # no-op
+ rescue Exception => e
+ @logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.message}"
+ ensure
+ close_transport
+ end
+
+ @origin_thread.raise(e)
+ else
+ raise "could not open connection: server did not respond with connection.open-ok but #{connection_open_ok.inspect} instead"
+ end
+ end
end
def heartbeat_disabled?(val)
0 == val || val.nil?
end
@@ -956,10 +995,11 @@
end
end
# @private
def initialize_heartbeat_sender
+ maybe_shutdown_heartbeat_sender
@logger.debug "Initializing heartbeat sender..."
@heartbeat_sender = HeartbeatSender.new(@transport, @logger)
@heartbeat_sender.start(@heartbeat)
end
@@ -1034,9 +1074,27 @@
when :warn, Logger::WARN, "warn" then Logger::WARN
when :error, Logger::ERROR, "error" then Logger::ERROR
when :fatal, Logger::FATAL, "fatal" then Logger::FATAL
else
Logger::WARN
+ end
+ end
+
+ # @private
+ def shut_down_all_consumer_work_pools!
+ @channels.each do |_, ch|
+ ch.maybe_kill_consumer_work_pool!
+ end
+ end
+
+ def normalize_client_channel_max(n)
+ return CHANNEL_MAX_LIMIT if n > CHANNEL_MAX_LIMIT
+
+ case n
+ when 0 then
+ CHANNEL_MAX_LIMIT
+ else
+ n
end
end
end # Session
# backwards compatibility