lib/bunny/session.rb in bunny-0.9.0.pre3 vs lib/bunny/session.rb in bunny-0.9.0.pre4
- old
+ new
@@ -3,10 +3,13 @@
require "bunny/transport"
require "bunny/channel_id_allocator"
require "bunny/heartbeat_sender"
require "bunny/main_loop"
+require "bunny/authentication/credentials_encoder"
+require "bunny/authentication/plain_mechanism_encoder"
+require "bunny/authentication/external_mechanism_encoder"
require "bunny/concurrent/condition"
require "amq/protocol/client"
require "amq/settings"
@@ -16,24 +19,22 @@
DEFAULT_HOST = "127.0.0.1"
DEFAULT_VHOST = "/"
DEFAULT_USER = "guest"
DEFAULT_PASSWORD = "guest"
- # 0 means "no heartbeat". This is the same default RabbitMQ Java client and amqp gem
- # use.
- DEFAULT_HEARTBEAT = 0
+ # the same value as RabbitMQ 3.0 uses. MK.
+ DEFAULT_HEARTBEAT = 600
# 128K
DEFAULT_FRAME_MAX = 131072
# backwards compatibility
CONNECT_TIMEOUT = Transport::DEFAULT_CONNECTION_TIMEOUT
DEFAULT_CLIENT_PROPERTIES = {
- # once we support AMQP 0.9.1 extensions, this needs to be updated. MK.
:capabilities => {
- # :publisher_confirms => true,
+ :publisher_confirms => true,
:consumer_cancel_notify => true,
:exchange_exchange_bindings => true,
:"basic.nack" => true
},
:product => "Bunny",
@@ -51,11 +52,15 @@
attr_reader :status, :host, :port, :heartbeat, :user, :pass, :vhost, :frame_max
attr_reader :server_capabilities, :server_properties, :server_authentication_mechanisms, :server_locales
attr_reader :default_channel
attr_reader :channel_id_allocator
+ # Authentication mechanism, e.g. "PLAIN" or "EXTERNAL"
+ # @return [String]
+ attr_reader :mechanism
+
def initialize(connection_string_or_opts = Hash.new, optz = Hash.new)
opts = case (ENV["RABBITMQ_URL"] || connection_string_or_opts)
when nil then
Hash.new
when String then
@@ -78,17 +83,18 @@
# these are negotiated with the broker during the connection tuning phase
@client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX)
@client_channel_max = opts.fetch(:channel_max, 65536)
@client_heartbeat = self.heartbeat_from(opts)
- @client_properties = opts[:properties] || DEFAULT_CLIENT_PROPERTIES
- @mechanism = "PLAIN"
- @locale = @opts.fetch(:locale, DEFAULT_LOCALE)
- @channel_mutex = Mutex.new
- @channels = Hash.new
+ @client_properties = opts[:properties] || DEFAULT_CLIENT_PROPERTIES
+ @mechanism = opts.fetch(:auth_mechanism, "PLAIN")
+ @credentials_encoder = credentials_encoder_for(@mechanism)
+ @locale = @opts.fetch(:locale, DEFAULT_LOCALE)
+ @channel_mutex = Mutex.new
+ @channels = Hash.new
- @continuations = ::Queue.new
+ @continuations = ::Queue.new
end
def hostname; self.host; end
def username; self.user; end
def password; self.pass; end
@@ -443,11 +449,16 @@
connection_tune = frame.decode_payload
@frame_max = negotiate_value(@client_frame_max, connection_tune.frame_max)
@channel_max = negotiate_value(@client_channel_max, connection_tune.channel_max)
- @heartbeat = negotiate_value(@client_heartbeat, connection_tune.heartbeat)
+ # this allows for disabled heartbeats. MK.
+ @heartbeat = if 0 == @client_heartbeat || @client_heartbeat.nil?
+ 0
+ else
+ negotiate_value(@client_heartbeat, connection_tune.heartbeat)
+ end
@channel_id_allocator = ChannelIdAllocator.new(@channel_max)
@transport.send_frame(AMQ::Protocol::Connection::TuneOk.encode(@channel_max, @frame_max, @heartbeat))
@transport.send_frame(AMQ::Protocol::Connection::Open.encode(self.vhost))
@@ -498,13 +509,16 @@
# @api plugin
- # @see http://tools.ietf.org/rfc/rfc2595.txt RFC 2595
def encode_credentials(username, password)
- "\0#{username}\0#{password}"
+ @credentials_encoder.encode_credentials(username, password)
end # encode_credentials(username, password)
+
+ def credentials_encoder_for(mechanism)
+ Authentication::CredentialsEncoder.for_session(self)
+ end
end # Session
# backwards compatibility
Client = Session
end