lib/bunny/session.rb in bunny-0.9.0.pre3 vs lib/bunny/session.rb in bunny-0.9.0.pre4

- old
+ new

@@ -3,10 +3,13 @@ require "bunny/transport" require "bunny/channel_id_allocator" require "bunny/heartbeat_sender" require "bunny/main_loop" +require "bunny/authentication/credentials_encoder" +require "bunny/authentication/plain_mechanism_encoder" +require "bunny/authentication/external_mechanism_encoder" require "bunny/concurrent/condition" require "amq/protocol/client" require "amq/settings" @@ -16,24 +19,22 @@ DEFAULT_HOST = "127.0.0.1" DEFAULT_VHOST = "/" DEFAULT_USER = "guest" DEFAULT_PASSWORD = "guest" - # 0 means "no heartbeat". This is the same default RabbitMQ Java client and amqp gem - # use. - DEFAULT_HEARTBEAT = 0 + # the same value as RabbitMQ 3.0 uses. MK. + DEFAULT_HEARTBEAT = 600 # 128K DEFAULT_FRAME_MAX = 131072 # backwards compatibility CONNECT_TIMEOUT = Transport::DEFAULT_CONNECTION_TIMEOUT DEFAULT_CLIENT_PROPERTIES = { - # once we support AMQP 0.9.1 extensions, this needs to be updated. MK. :capabilities => { - # :publisher_confirms => true, + :publisher_confirms => true, :consumer_cancel_notify => true, :exchange_exchange_bindings => true, :"basic.nack" => true }, :product => "Bunny", @@ -51,11 +52,15 @@ attr_reader :status, :host, :port, :heartbeat, :user, :pass, :vhost, :frame_max attr_reader :server_capabilities, :server_properties, :server_authentication_mechanisms, :server_locales attr_reader :default_channel attr_reader :channel_id_allocator + # Authentication mechanism, e.g. "PLAIN" or "EXTERNAL" + # @return [String] + attr_reader :mechanism + def initialize(connection_string_or_opts = Hash.new, optz = Hash.new) opts = case (ENV["RABBITMQ_URL"] || connection_string_or_opts) when nil then Hash.new when String then @@ -78,17 +83,18 @@ # these are negotiated with the broker during the connection tuning phase @client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX) @client_channel_max = opts.fetch(:channel_max, 65536) @client_heartbeat = self.heartbeat_from(opts) - @client_properties = opts[:properties] || DEFAULT_CLIENT_PROPERTIES - @mechanism = "PLAIN" - @locale = @opts.fetch(:locale, DEFAULT_LOCALE) - @channel_mutex = Mutex.new - @channels = Hash.new + @client_properties = opts[:properties] || DEFAULT_CLIENT_PROPERTIES + @mechanism = opts.fetch(:auth_mechanism, "PLAIN") + @credentials_encoder = credentials_encoder_for(@mechanism) + @locale = @opts.fetch(:locale, DEFAULT_LOCALE) + @channel_mutex = Mutex.new + @channels = Hash.new - @continuations = ::Queue.new + @continuations = ::Queue.new end def hostname; self.host; end def username; self.user; end def password; self.pass; end @@ -443,11 +449,16 @@ connection_tune = frame.decode_payload @frame_max = negotiate_value(@client_frame_max, connection_tune.frame_max) @channel_max = negotiate_value(@client_channel_max, connection_tune.channel_max) - @heartbeat = negotiate_value(@client_heartbeat, connection_tune.heartbeat) + # this allows for disabled heartbeats. MK. + @heartbeat = if 0 == @client_heartbeat || @client_heartbeat.nil? + 0 + else + negotiate_value(@client_heartbeat, connection_tune.heartbeat) + end @channel_id_allocator = ChannelIdAllocator.new(@channel_max) @transport.send_frame(AMQ::Protocol::Connection::TuneOk.encode(@channel_max, @frame_max, @heartbeat)) @transport.send_frame(AMQ::Protocol::Connection::Open.encode(self.vhost)) @@ -498,13 +509,16 @@ # @api plugin - # @see http://tools.ietf.org/rfc/rfc2595.txt RFC 2595 def encode_credentials(username, password) - "\0#{username}\0#{password}" + @credentials_encoder.encode_credentials(username, password) end # encode_credentials(username, password) + + def credentials_encoder_for(mechanism) + Authentication::CredentialsEncoder.for_session(self) + end end # Session # backwards compatibility Client = Session end