lib/bunny/session.rb in bunny-0.9.0.pre2 vs lib/bunny/session.rb in bunny-0.9.0.pre3

- old
+ new

@@ -28,15 +28,20 @@ CONNECT_TIMEOUT = Transport::DEFAULT_CONNECTION_TIMEOUT DEFAULT_CLIENT_PROPERTIES = { # once we support AMQP 0.9.1 extensions, this needs to be updated. MK. - :capabilities => {}, + :capabilities => { + # :publisher_confirms => true, + :consumer_cancel_notify => true, + :exchange_exchange_bindings => true, + :"basic.nack" => true + }, :product => "Bunny", :platform => ::RUBY_DESCRIPTION, :version => Bunny::VERSION, - :information => "http://github.com/ruby-amqp/bunny" + :information => "http://github.com/ruby-amqp/bunny", } DEFAULT_LOCALE = "en_GB" @@ -48,11 +53,13 @@ attr_reader :server_capabilities, :server_properties, :server_authentication_mechanisms, :server_locales attr_reader :default_channel attr_reader :channel_id_allocator def initialize(connection_string_or_opts = Hash.new, optz = Hash.new) - opts = case connection_string_or_opts + opts = case (ENV["RABBITMQ_URL"] || connection_string_or_opts) + when nil then + Hash.new when String then AMQ::Settings.parse_amqp_url(connection_string_or_opts) when Hash then connection_string_or_opts end.merge(optz) @@ -64,27 +71,23 @@ @pass = self.password_from(opts) @vhost = self.vhost_from(opts) @logfile = opts[:logfile] @logging = opts[:logging] || false - @status = :not_connected - @frame_max = opts[:frame_max] || DEFAULT_FRAME_MAX - # currently ignored - @channel_max = opts[:channel_max] || 0 - @heartbeat = self.heartbeat_from(opts) + @status = :not_connected + # these are negotiated with the broker during the connection tuning phase + @client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX) + @client_channel_max = opts.fetch(:channel_max, 65536) + @client_heartbeat = self.heartbeat_from(opts) + @client_properties = opts[:properties] || DEFAULT_CLIENT_PROPERTIES @mechanism = "PLAIN" @locale = @opts.fetch(:locale, DEFAULT_LOCALE) + @channel_mutex = Mutex.new + @channels = Hash.new - @channel_id_allocator = ChannelIdAllocator.new - @channel_mutex = Mutex.new - @channels = Hash.new - - # Create channel 0 - @channel0 = Bunny::Channel.new(self, 0) - @continuations = ::Queue.new end def hostname; self.host; end def username; self.user; end @@ -99,14 +102,10 @@ def uses_ssl? @transport.uses_ssl? end alias ssl? uses_ssl? - def channel0 - @channel0 - end - def start @status = :connecting self.initialize_transport @@ -440,14 +439,18 @@ if frame.nil? @state = :closed raise Bunny::PossibleAuthenticationFailureError.new(self.user, self.vhost, self.password.size) end - connection_tune = frame.decode_payload - @frame_max = connection_tune.frame_max.freeze - @heartbeat ||= connection_tune.heartbeat + connection_tune = frame.decode_payload + @frame_max = negotiate_value(@client_frame_max, connection_tune.frame_max) + @channel_max = negotiate_value(@client_channel_max, connection_tune.channel_max) + @heartbeat = negotiate_value(@client_heartbeat, connection_tune.heartbeat) + + @channel_id_allocator = ChannelIdAllocator.new(@channel_max) + @transport.send_frame(AMQ::Protocol::Connection::TuneOk.encode(@channel_max, @frame_max, @heartbeat)) @transport.send_frame(AMQ::Protocol::Connection::Open.encode(self.vhost)) frame2 = begin @transport.read_next_frame @@ -466,9 +469,17 @@ if @heartbeat && @heartbeat > 0 initialize_heartbeat_sender end raise "could not open connection: server did not respond with connection.open-ok" unless connection_open_ok.is_a?(AMQ::Protocol::Connection::OpenOk) + end + + def negotiate_value(client_value, server_value) + if client_value == 0 || server_value == 0 + [client_value, server_value].max + else + [client_value, server_value].min + end end def initialize_heartbeat_sender @heartbeat_sender = HeartbeatSender.new(@transport) @heartbeat_sender.start(@heartbeat)