lib/bunny/session.rb in bunny-1.1.0.pre1 vs lib/bunny/session.rb in bunny-1.1.0.pre2

- old
+ new

@@ -34,10 +34,14 @@ DEFAULT_PASSWORD = "guest" # Default heartbeat interval, the same value as RabbitMQ 3.0 uses. DEFAULT_HEARTBEAT = :server # @private DEFAULT_FRAME_MAX = 131072 + # 2^16 - 1, maximum representable signed 16 bit integer. + # @private + CHANNEL_MAX_LIMIT = 65535 + DEFAULT_CHANNEL_MAX = CHANNEL_MAX_LIMIT # backwards compatibility # @private CONNECT_TIMEOUT = Transport::DEFAULT_CONNECTION_TIMEOUT @@ -76,11 +80,11 @@ # API # # @return [Bunny::Transport] attr_reader :transport - attr_reader :status, :host, :port, :heartbeat, :user, :pass, :vhost, :frame_max, :threaded + attr_reader :status, :host, :port, :heartbeat, :user, :pass, :vhost, :frame_max, :channel_max, :threaded attr_reader :server_capabilities, :server_properties, :server_authentication_mechanisms, :server_locales attr_reader :default_channel attr_reader :channel_id_allocator # Authentication mechanism, e.g. "PLAIN" or "EXTERNAL" # @return [String] @@ -148,11 +152,13 @@ @status = :not_connected @blocked = false # these are negotiated with the broker during the connection tuning phase @client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX) - @client_channel_max = opts.fetch(:channel_max, 65536) + @client_channel_max = normalize_client_channel_max(opts.fetch(:channel_max, DEFAULT_CHANNEL_MAX)) + # will be-renegotiated during connection tuning steps. MK. + @channel_max = @client_channel_max @client_heartbeat = self.heartbeat_from(opts) @client_properties = opts[:properties] || DEFAULT_CLIENT_PROPERTIES @mechanism = opts.fetch(:auth_mechanism, "PLAIN") @credentials_encoder = credentials_encoder_for(@mechanism) @@ -286,13 +292,14 @@ self.close_connection(true) end maybe_shutdown_reader_loop close_transport - - @status = :closed end + + shut_down_all_consumer_work_pools! + @status = :closed end alias stop close # Creates a temporary channel, yields it to the block given to this # method and closes it. @@ -478,17 +485,18 @@ # @private def close_connection(sync = true) if @transport.open? @transport.send_frame(AMQ::Protocol::Connection::Close.encode(200, "Goodbye", 0, 0)) - maybe_shutdown_heartbeat_sender - @status = :not_connected - if sync @last_connection_close_ok = wait_on_continuations end end + + shut_down_all_consumer_work_pools! + maybe_shutdown_heartbeat_sender + @status = :not_connected end # Handles incoming frames and dispatches them. # # Channel methods (`channel.open-ok`, `channel.close-ok`) are @@ -506,10 +514,21 @@ @continuations.push(method) when AMQ::Protocol::Connection::Close then @last_connection_error = instantiate_connection_level_exception(method) @continuations.push(method) + begin + shut_down_all_consumer_work_pools! + maybe_shutdown_reader_loop + rescue ShutdownSignal => sse + # no-op + rescue Exception => e + @logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.message}" + ensure + close_transport + end + @origin_thread.raise(@last_connection_error) when AMQ::Protocol::Connection::CloseOk then @last_connection_close_ok = method begin @continuations.clear @@ -645,10 +664,12 @@ ChannelError when 505 then UnexpectedFrame when 506 then ResourceError + when 530 then + NotAllowedError when 541 then InternalError else raise "Unknown reply code: #{frame.reply_code}, text: #{frame.reply_text}" end @@ -936,11 +957,29 @@ @status = :open if @heartbeat && @heartbeat > 0 initialize_heartbeat_sender end - raise "could not open connection: server did not respond with connection.open-ok" unless connection_open_ok.is_a?(AMQ::Protocol::Connection::OpenOk) + unless connection_open_ok.is_a?(AMQ::Protocol::Connection::OpenOk) + if connection_open_ok.is_a?(AMQ::Protocol::Connection::Close) + e = instantiate_connection_level_exception(connection_open_ok) + begin + shut_down_all_consumer_work_pools! + maybe_shutdown_reader_loop + rescue ShutdownSignal => sse + # no-op + rescue Exception => e + @logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.message}" + ensure + close_transport + end + + @origin_thread.raise(e) + else + raise "could not open connection: server did not respond with connection.open-ok but #{connection_open_ok.inspect} instead" + end + end end def heartbeat_disabled?(val) 0 == val || val.nil? end @@ -956,10 +995,11 @@ end end # @private def initialize_heartbeat_sender + maybe_shutdown_heartbeat_sender @logger.debug "Initializing heartbeat sender..." @heartbeat_sender = HeartbeatSender.new(@transport, @logger) @heartbeat_sender.start(@heartbeat) end @@ -1034,9 +1074,27 @@ when :warn, Logger::WARN, "warn" then Logger::WARN when :error, Logger::ERROR, "error" then Logger::ERROR when :fatal, Logger::FATAL, "fatal" then Logger::FATAL else Logger::WARN + end + end + + # @private + def shut_down_all_consumer_work_pools! + @channels.each do |_, ch| + ch.maybe_kill_consumer_work_pool! + end + end + + def normalize_client_channel_max(n) + return CHANNEL_MAX_LIMIT if n > CHANNEL_MAX_LIMIT + + case n + when 0 then + CHANNEL_MAX_LIMIT + else + n end end end # Session # backwards compatibility