lib/bunny/session.rb in bunny-0.9.0.pre2 vs lib/bunny/session.rb in bunny-0.9.0.pre3
- old
+ new
@@ -28,15 +28,20 @@
CONNECT_TIMEOUT = Transport::DEFAULT_CONNECTION_TIMEOUT
DEFAULT_CLIENT_PROPERTIES = {
# once we support AMQP 0.9.1 extensions, this needs to be updated. MK.
- :capabilities => {},
+ :capabilities => {
+ # :publisher_confirms => true,
+ :consumer_cancel_notify => true,
+ :exchange_exchange_bindings => true,
+ :"basic.nack" => true
+ },
:product => "Bunny",
:platform => ::RUBY_DESCRIPTION,
:version => Bunny::VERSION,
- :information => "http://github.com/ruby-amqp/bunny"
+ :information => "http://github.com/ruby-amqp/bunny",
}
DEFAULT_LOCALE = "en_GB"
@@ -48,11 +53,13 @@
attr_reader :server_capabilities, :server_properties, :server_authentication_mechanisms, :server_locales
attr_reader :default_channel
attr_reader :channel_id_allocator
def initialize(connection_string_or_opts = Hash.new, optz = Hash.new)
- opts = case connection_string_or_opts
+ opts = case (ENV["RABBITMQ_URL"] || connection_string_or_opts)
+ when nil then
+ Hash.new
when String then
AMQ::Settings.parse_amqp_url(connection_string_or_opts)
when Hash then
connection_string_or_opts
end.merge(optz)
@@ -64,27 +71,23 @@
@pass = self.password_from(opts)
@vhost = self.vhost_from(opts)
@logfile = opts[:logfile]
@logging = opts[:logging] || false
- @status = :not_connected
- @frame_max = opts[:frame_max] || DEFAULT_FRAME_MAX
- # currently ignored
- @channel_max = opts[:channel_max] || 0
- @heartbeat = self.heartbeat_from(opts)
+ @status = :not_connected
+ # these are negotiated with the broker during the connection tuning phase
+ @client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX)
+ @client_channel_max = opts.fetch(:channel_max, 65536)
+ @client_heartbeat = self.heartbeat_from(opts)
+
@client_properties = opts[:properties] || DEFAULT_CLIENT_PROPERTIES
@mechanism = "PLAIN"
@locale = @opts.fetch(:locale, DEFAULT_LOCALE)
+ @channel_mutex = Mutex.new
+ @channels = Hash.new
- @channel_id_allocator = ChannelIdAllocator.new
- @channel_mutex = Mutex.new
- @channels = Hash.new
-
- # Create channel 0
- @channel0 = Bunny::Channel.new(self, 0)
-
@continuations = ::Queue.new
end
def hostname; self.host; end
def username; self.user; end
@@ -99,14 +102,10 @@
def uses_ssl?
@transport.uses_ssl?
end
alias ssl? uses_ssl?
- def channel0
- @channel0
- end
-
def start
@status = :connecting
self.initialize_transport
@@ -440,14 +439,18 @@
if frame.nil?
@state = :closed
raise Bunny::PossibleAuthenticationFailureError.new(self.user, self.vhost, self.password.size)
end
- connection_tune = frame.decode_payload
- @frame_max = connection_tune.frame_max.freeze
- @heartbeat ||= connection_tune.heartbeat
+ connection_tune = frame.decode_payload
+ @frame_max = negotiate_value(@client_frame_max, connection_tune.frame_max)
+ @channel_max = negotiate_value(@client_channel_max, connection_tune.channel_max)
+ @heartbeat = negotiate_value(@client_heartbeat, connection_tune.heartbeat)
+
+ @channel_id_allocator = ChannelIdAllocator.new(@channel_max)
+
@transport.send_frame(AMQ::Protocol::Connection::TuneOk.encode(@channel_max, @frame_max, @heartbeat))
@transport.send_frame(AMQ::Protocol::Connection::Open.encode(self.vhost))
frame2 = begin
@transport.read_next_frame
@@ -466,9 +469,17 @@
if @heartbeat && @heartbeat > 0
initialize_heartbeat_sender
end
raise "could not open connection: server did not respond with connection.open-ok" unless connection_open_ok.is_a?(AMQ::Protocol::Connection::OpenOk)
+ end
+
+ def negotiate_value(client_value, server_value)
+ if client_value == 0 || server_value == 0
+ [client_value, server_value].max
+ else
+ [client_value, server_value].min
+ end
end
def initialize_heartbeat_sender
@heartbeat_sender = HeartbeatSender.new(@transport)
@heartbeat_sender.start(@heartbeat)