lib/bunny/session.rb in bunny-0.9.0.pre6 vs lib/bunny/session.rb in bunny-0.9.0.pre7

- old
+ new

@@ -13,54 +13,76 @@ require "amq/protocol/client" require "amq/settings" module Bunny + # Represents AMQP 0.9.1 connection (connection to RabbitMQ). + # @see http://rubybunny.info/articles/connecting.html Connecting to RabbitMQ guide class Session + # Default host used for connection DEFAULT_HOST = "127.0.0.1" + # Default virtual host used for connection DEFAULT_VHOST = "/" + # Default username used for connection DEFAULT_USER = "guest" + # Default password used for connection DEFAULT_PASSWORD = "guest" - # the same value as RabbitMQ 3.0 uses. MK. + # Default heartbeat interval, the same value as RabbitMQ 3.0 uses. DEFAULT_HEARTBEAT = 600 - # 128K + # @private DEFAULT_FRAME_MAX = 131072 # backwards compatibility + # @private CONNECT_TIMEOUT = Transport::DEFAULT_CONNECTION_TIMEOUT - + # RabbitMQ client metadata DEFAULT_CLIENT_PROPERTIES = { :capabilities => { :publisher_confirms => true, :consumer_cancel_notify => true, :exchange_exchange_bindings => true, :"basic.nack" => true }, :product => "Bunny", :platform => ::RUBY_DESCRIPTION, :version => Bunny::VERSION, - :information => "http://github.com/ruby-amqp/bunny", + :information => "http://rubybunny.info", } DEFAULT_LOCALE = "en_GB" # # API # - attr_reader :status, :host, :port, :heartbeat, :user, :pass, :vhost, :frame_max + attr_reader :status, :host, :port, :heartbeat, :user, :pass, :vhost, :frame_max, :threaded attr_reader :server_capabilities, :server_properties, :server_authentication_mechanisms, :server_locales attr_reader :default_channel attr_reader :channel_id_allocator # Authentication mechanism, e.g. "PLAIN" or "EXTERNAL" # @return [String] attr_reader :mechanism + # @param [String, Hash] connection_string_or_opts Connection string or a hash of connection options + # @param [Hash] optz Extra options not related to connection + # + # @option connection_string_or_opts [String] :host ("127.0.0.1") Hostname or IP address to connect to + # @option connection_string_or_opts [Integer] :port (5672) Port RabbitMQ listens on + # @option connection_string_or_opts [String] :username ("guest") Username + # @option connection_string_or_opts [String] :password ("guest") Password + # @option connection_string_or_opts [String] :vhost ("/") Virtual host to use + # @option connection_string_or_opts [Integer] :heartbeat (600) Heartbeat interval. 0 means no heartbeat. + # + # @option optz [String] :auth_mechanism ("PLAIN") Authentication mechanism, PLAIN or EXTERNAL + # @option optz [String] :locale ("PLAIN") Locale RabbitMQ should use + # + # @see http://rubybunny.info/articles/connecting.html Connecting to RabbitMQ guide + # @api public def initialize(connection_string_or_opts = Hash.new, optz = Hash.new) opts = case (ENV["RABBITMQ_URL"] || connection_string_or_opts) when nil then Hash.new when String then @@ -75,10 +97,11 @@ @user = self.username_from(opts) @pass = self.password_from(opts) @vhost = self.vhost_from(opts) @logfile = opts[:logfile] @logging = opts[:logging] || false + @threaded = opts.fetch(:threaded, true) @status = :not_connected # these are negotiated with the broker during the connection tuning phase @client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX) @@ -93,41 +116,57 @@ @channels = Hash.new @continuations = ::Queue.new end + # @return [String] RabbitMQ hostname (or IP address) used def hostname; self.host; end + # @return [String] Username used def username; self.user; end + # @return [String] Password used def password; self.pass; end + # @return [String] Virtual host used def virtual_host; self.vhost; end + # @return [Boolean] true if this connection uses TLS (SSL) def uses_tls? @transport.uses_tls? end alias tls? uses_tls? + # @return [Boolean] true if this connection uses TLS (SSL) def uses_ssl? @transport.uses_ssl? end alias ssl? uses_ssl? + # Starts connection process + # @api public def start @continuations = ::Queue.new @status = :connecting self.initialize_transport self.init_connection self.open_connection @event_loop = nil - self.start_main_loop + self.start_main_loop if @threaded @default_channel = self.create_channel end + def read_write_timeout + @transport.read_write_timeout + end + # Opens a new channel and returns it. This method will block the calling + # thread until the response is received and the channel is guaranteed to be + # opened (this operation is very fast and inexpensive). + # + # @return [Bunny::Channel] Newly opened channel def create_channel(n = nil) if n && (ch = @channels[n]) ch else ch = Bunny::Channel.new(self, n) @@ -135,15 +174,16 @@ ch end end alias channel create_channel + # Closes the connection. This involves closing all of its channels. def close if @transport.open? close_all_channels - Bunny::Timer.timeout(@disconnect_timeout, ClientTimeout) do + Bunny::Timer.timeout(@transport.disconnect_timeout, ClientTimeout) do self.close_connection(false) end end end alias stop close @@ -164,107 +204,117 @@ def closed? status == :closed end def open? - status == :open || status == :connected + (status == :open || status == :connected) && @transport.open? end alias connected? open? # # Backwards compatibility # + # @private def queue(*args) @default_channel.queue(*args) end + # @private def direct(*args) @default_channel.direct(*args) end + # @private def fanout(*args) @default_channel.fanout(*args) end + # @private def topic(*args) @default_channel.topic(*args) end + # @private def headers(*args) @default_channel.headers(*args) end + # @private def exchange(*args) @default_channel.exchange(*args) end # # Implementation # - + # @private def open_channel(ch) n = ch.number self.register_channel(ch) @transport.send_frame(AMQ::Protocol::Channel::Open.encode(n, AMQ::Protocol::EMPTY_STRING)) - @last_channel_open_ok = @continuations.pop + @last_channel_open_ok = wait_on_continuations raise_if_continuation_resulted_in_a_connection_error! @last_channel_open_ok end + # @private def close_channel(ch) n = ch.number @transport.send_frame(AMQ::Protocol::Channel::Close.encode(n, 200, "Goodbye", 0, 0)) - @last_channel_close_ok = @continuations.pop + @last_channel_close_ok = wait_on_continuations raise_if_continuation_resulted_in_a_connection_error! self.unregister_channel(ch) @last_channel_close_ok end + # @private def close_all_channels @channels.reject {|n, ch| n == 0 || !ch.open? }.each do |_, ch| - Bunny::Timer.timeout(@disconnect_timeout, ClientTimeout) { ch.close } + Bunny::Timer.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close } end end + # @private def close_connection(sync = true) @transport.send_frame(AMQ::Protocol::Connection::Close.encode(200, "Goodbye", 0, 0)) maybe_shutdown_heartbeat_sender @status = :not_connected if sync - @last_connection_close_ok = @continuations.pop + @last_connection_close_ok = wait_on_continuations end end + # @private def handle_frame(ch_number, method) # puts "Session#handle_frame on #{ch_number}: #{method.inspect}" case method when AMQ::Protocol::Channel::OpenOk then @continuations.push(method) when AMQ::Protocol::Channel::CloseOk then @continuations.push(method) when AMQ::Protocol::Connection::Close then @last_connection_error = instantiate_connection_level_exception(method) - @contunuations.push(method) + @continuations.push(method) when AMQ::Protocol::Connection::CloseOk then @last_connection_close_ok = method begin @continuations.clear - @event_loop.stop + event_loop.stop @event_loop = nil @transport.close - rescue Exception => e + rescue StandardError => e puts e.class.name puts e.message puts e.backtrace ensure @active_continuation.notify_all if @active_continuation @@ -278,14 +328,19 @@ self.unregister_channel(ch) end when AMQ::Protocol::Basic::GetEmpty then @channels[ch_number].handle_basic_get_empty(method) else - @channels[ch_number].handle_method(method) + if ch = @channels[ch_number] + ch.handle_method(method) + else + # TODO: log a warning + end end end + # @private def raise_if_continuation_resulted_in_a_connection_error! raise @last_connection_error if @last_connection_error end def handle_frameset(ch_number, frames) @@ -301,11 +356,14 @@ else @channels[ch_number].handle_frameset(*frames) end end + # @private def handle_network_failure(exception) + raise NetworkErrorWrapper.new(exception) unless @threaded + if !recovering_from_network_failure? @recovering_from_network_failure = true if recoverable_network_failure?(exception) # puts "Recovering from a network failure..." @channels.each do |n, ch| @@ -318,19 +376,22 @@ # TODO: investigate if we can be a bit smarter here. MK. end end end + # @private def recoverable_network_failure?(exception) # TODO: investigate if we can be a bit smarter here. MK. true end + # @private def recovering_from_network_failure? @recovering_from_network_failure end + # @private def recover_from_network_failure begin # puts "About to start recovery..." start @@ -344,10 +405,11 @@ sleep 5.0 retry if recoverable_network_failure?(e) end end + # @private def recover_channels # default channel is reopened right after connection # negotiation is completed, so make sure we do not try to open # it twice. MK. @channels.reject { |n, ch| ch == @default_channel }.each do |n, ch| @@ -355,14 +417,16 @@ ch.recover_from_network_failure end end + # @private def send_raw(*args) @transport.write(*args) end + # @private def instantiate_connection_level_exception(frame) case frame when AMQ::Protocol::Connection::Close then klass = case frame.reply_code when 503 then @@ -377,68 +441,84 @@ klass.new("Connection-level error: #{frame.reply_text}", self, frame) end end + # @private def hostname_from(options) options[:host] || options[:hostname] || DEFAULT_HOST end + # @private def port_from(options) fallback = if options[:tls] || options[:ssl] AMQ::Protocol::TLS_PORT else AMQ::Protocol::DEFAULT_PORT end options.fetch(:port, fallback) end + # @private def vhost_from(options) options[:virtual_host] || options[:vhost] || DEFAULT_VHOST end + # @private def username_from(options) options[:username] || options[:user] || DEFAULT_USER end + # @private def password_from(options) options[:password] || options[:pass] || options [:pwd] || DEFAULT_PASSWORD end + # @private def heartbeat_from(options) options[:heartbeat] || options[:heartbeat_interval] || options[:requested_heartbeat] || DEFAULT_HEARTBEAT end + # @private def next_channel_id @channel_id_allocator.next_channel_id end + # @private def release_channel_id(i) @channel_id_allocator.release_channel_id(i) end + # @private def register_channel(ch) @channel_mutex.synchronize do @channels[ch.number] = ch end end + # @private def unregister_channel(ch) @channel_mutex.synchronize do n = ch.number self.release_channel_id(n) @channels.delete(ch.number) end end + # @private def start_main_loop - @event_loop = MainLoop.new(@transport, self) - @event_loop.start + event_loop.start end + # @private + def event_loop + @event_loop ||= MainLoop.new(@transport, self) + end + + # @private def signal_activity! @heartbeat_sender.signal_activity! if @heartbeat_sender end @@ -456,11 +536,11 @@ end # Sends multiple frames, one by one. For thread safety this method takes a channel # object and synchronizes on it. # - # @api public + # @api private def send_frameset(frames, channel) # some developers end up sharing channels between threads and when multiple # threads publish on the same channel aggressively, at some point frames will be # delivered out of order and broker will raise 505 UNEXPECTED_FRAME exception. # If we synchronize on the channel, however, this is both thread safe and pretty fine-grained @@ -471,10 +551,11 @@ end end # send_frameset(frames) protected + # @api private def init_connection self.send_preamble connection_start = @transport.read_next_frame.decode_payload @@ -485,10 +566,11 @@ @server_locales = Array(connection_start.locales) @status = :connected end + # @api private def open_connection @transport.send_frame(AMQ::Protocol::Connection::StartOk.encode(@client_properties, @mechanism, self.encode_credentials(username, password), @locale)) frame = begin @transport.read_next_frame @@ -537,47 +619,59 @@ end raise "could not open connection: server did not respond with connection.open-ok" unless connection_open_ok.is_a?(AMQ::Protocol::Connection::OpenOk) end + # @api private def negotiate_value(client_value, server_value) if client_value == 0 || server_value == 0 [client_value, server_value].max else [client_value, server_value].min end end + # @api private def initialize_heartbeat_sender # puts "Initializing heartbeat sender..." @heartbeat_sender = HeartbeatSender.new(@transport) @heartbeat_sender.start(@heartbeat) end + # @api private def maybe_shutdown_heartbeat_sender @heartbeat_sender.stop if @heartbeat_sender end - + # @api private def initialize_transport @transport = Transport.new(self, @host, @port, @opts) end # Sends AMQ protocol header (also known as preamble). + # @api private def send_preamble @transport.send_raw(AMQ::Protocol::PREAMBLE) end - - - # @api plugin + # @api private def encode_credentials(username, password) @credentials_encoder.encode_credentials(username, password) end # encode_credentials(username, password) + # @api private def credentials_encoder_for(mechanism) Authentication::CredentialsEncoder.for_session(self) + end + + # @api private + def wait_on_continuations + unless @threaded + event_loop.run_once until @continuations.length > 0 + end + + @continuations.pop end end # Session # backwards compatibility Client = Session