lib/bunny/session.rb in bunny-0.10.4 vs lib/bunny/session.rb in bunny-0.10.5

- old
+ new

@@ -95,22 +95,27 @@ # @option connection_string_or_opts [Integer] :port (5672) Port RabbitMQ listens on # @option connection_string_or_opts [String] :username ("guest") Username # @option connection_string_or_opts [String] :password ("guest") Password # @option connection_string_or_opts [String] :vhost ("/") Virtual host to use # @option connection_string_or_opts [Integer] :heartbeat (600) Heartbeat interval. 0 means no heartbeat. + # @option connection_string_or_opts [Boolean] :tls (false) Should TLS/SSL be used? + # @option connection_string_or_opts [String] :tls_cert (nil) Path to client TLS/SSL certificate file (.pem) + # @option connection_string_or_opts [String] :tls_key (nil) Path to client TLS/SSL private key file (.pem) + # @option connection_string_or_opts [Array<String>] :tls_ca_certificates Array of paths to TLS/SSL CA files (.pem), by default detected from OpenSSL configuration # # @option optz [String] :auth_mechanism ("PLAIN") Authentication mechanism, PLAIN or EXTERNAL # @option optz [String] :locale ("PLAIN") Locale RabbitMQ should use # # @see http://rubybunny.info/articles/connecting.html Connecting to RabbitMQ guide + # @see http://rubybunny.info/articles/tls.html TLS/SSL guide # @api public def initialize(connection_string_or_opts = Hash.new, optz = Hash.new) opts = case (ENV["RABBITMQ_URL"] || connection_string_or_opts) when nil then Hash.new when String then - AMQ::Settings.parse_amqp_url(connection_string_or_opts) + self.class.parse_uri(connection_string_or_opts) when Hash then connection_string_or_opts end.merge(optz) @opts = opts @@ -191,10 +196,12 @@ end # @private attr_reader :mutex_impl + # Provides a way to fine tune the socket used by connection. + # Accepts a block that the socket will be yielded to. def configure_socket(&block) raise ArgumentError, "No block provided!" if block.nil? @transport.configure_socket(&block) end @@ -263,11 +270,11 @@ # Closes the connection. This involves closing all of its channels. def close if @transport.open? close_all_channels - Bunny::Timer.timeout(@transport.disconnect_timeout, ClientTimeout) do + Bunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) do self.close_connection(true) end maybe_shutdown_reader_loop close_transport @@ -340,12 +347,47 @@ # @private def exchange(*args) @default_channel.exchange(*args) end + # Defines a callback that will be executed when RabbitMQ blocks the connection + # because it is running low on memory or disk space (as configured via config file + # and/or rabbitmqctl). + # + # @yield [AMQ::Protocol::Connection::Blocked] connection.blocked method which provides a reason for blocking + # + # @api public + def on_blocked(&block) + @block_callback = block + end + # Defines a callback that will be executed when RabbitMQ unblocks the connection + # that was previously blocked, e.g. because the memory or disk space alarm has cleared. # + # @see #on_blocked + # @api public + def on_unblocked(&block) + @unblock_callback = block + end + + # @return [Boolean] true if the connection is currently blocked by RabbitMQ because it's running low on + # RAM, disk space, or other resource; false otherwise + # @see #on_blocked + # @see #on_unblocked + def blocked? + @blocked + end + + # Parses an amqp[s] URI into a hash that {Bunny::Session#initialize} accepts. + # + # @param [String] uri amqp or amqps URI to parse + # @return [Hash] Parsed URI as a hash + def self.parse_uri(uri) + AMQ::Settings.parse_amqp_url(uri) + end + + # # Implementation # # @private def open_channel(ch) @@ -374,11 +416,11 @@ end # @private def close_all_channels @channels.reject {|n, ch| n == 0 || !ch.open? }.each do |_, ch| - Bunny::Timer.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close } + Bunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close } end end # @private def close_connection(sync = true) @@ -633,10 +675,10 @@ # joining the thread here may take forever # on JRuby because sun.nio.ch.KQueueArrayWrapper#kevent0 is # a native method that cannot be (easily) interrupted. # So we use this ugly hack or else our test suite takes forever # to run on JRuby (a new connection is opened/closed per example). MK. - if RUBY_ENGINE == "jruby" + if defined?(JRUBY_VERSION) sleep 0.075 else @reader_loop.join end else