lib/bunny/session.rb in bunny-0.10.4 vs lib/bunny/session.rb in bunny-0.10.5
- old
+ new
@@ -95,22 +95,27 @@
# @option connection_string_or_opts [Integer] :port (5672) Port RabbitMQ listens on
# @option connection_string_or_opts [String] :username ("guest") Username
# @option connection_string_or_opts [String] :password ("guest") Password
# @option connection_string_or_opts [String] :vhost ("/") Virtual host to use
# @option connection_string_or_opts [Integer] :heartbeat (600) Heartbeat interval. 0 means no heartbeat.
+ # @option connection_string_or_opts [Boolean] :tls (false) Should TLS/SSL be used?
+ # @option connection_string_or_opts [String] :tls_cert (nil) Path to client TLS/SSL certificate file (.pem)
+ # @option connection_string_or_opts [String] :tls_key (nil) Path to client TLS/SSL private key file (.pem)
+ # @option connection_string_or_opts [Array<String>] :tls_ca_certificates Array of paths to TLS/SSL CA files (.pem), by default detected from OpenSSL configuration
#
# @option optz [String] :auth_mechanism ("PLAIN") Authentication mechanism, PLAIN or EXTERNAL
# @option optz [String] :locale ("PLAIN") Locale RabbitMQ should use
#
# @see http://rubybunny.info/articles/connecting.html Connecting to RabbitMQ guide
+ # @see http://rubybunny.info/articles/tls.html TLS/SSL guide
# @api public
def initialize(connection_string_or_opts = Hash.new, optz = Hash.new)
opts = case (ENV["RABBITMQ_URL"] || connection_string_or_opts)
when nil then
Hash.new
when String then
- AMQ::Settings.parse_amqp_url(connection_string_or_opts)
+ self.class.parse_uri(connection_string_or_opts)
when Hash then
connection_string_or_opts
end.merge(optz)
@opts = opts
@@ -191,10 +196,12 @@
end
# @private
attr_reader :mutex_impl
+ # Provides a way to fine tune the socket used by connection.
+ # Accepts a block that the socket will be yielded to.
def configure_socket(&block)
raise ArgumentError, "No block provided!" if block.nil?
@transport.configure_socket(&block)
end
@@ -263,11 +270,11 @@
# Closes the connection. This involves closing all of its channels.
def close
if @transport.open?
close_all_channels
- Bunny::Timer.timeout(@transport.disconnect_timeout, ClientTimeout) do
+ Bunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) do
self.close_connection(true)
end
maybe_shutdown_reader_loop
close_transport
@@ -340,12 +347,47 @@
# @private
def exchange(*args)
@default_channel.exchange(*args)
end
+ # Defines a callback that will be executed when RabbitMQ blocks the connection
+ # because it is running low on memory or disk space (as configured via config file
+ # and/or rabbitmqctl).
+ #
+ # @yield [AMQ::Protocol::Connection::Blocked] connection.blocked method which provides a reason for blocking
+ #
+ # @api public
+ def on_blocked(&block)
+ @block_callback = block
+ end
+ # Defines a callback that will be executed when RabbitMQ unblocks the connection
+ # that was previously blocked, e.g. because the memory or disk space alarm has cleared.
#
+ # @see #on_blocked
+ # @api public
+ def on_unblocked(&block)
+ @unblock_callback = block
+ end
+
+ # @return [Boolean] true if the connection is currently blocked by RabbitMQ because it's running low on
+ # RAM, disk space, or other resource; false otherwise
+ # @see #on_blocked
+ # @see #on_unblocked
+ def blocked?
+ @blocked
+ end
+
+ # Parses an amqp[s] URI into a hash that {Bunny::Session#initialize} accepts.
+ #
+ # @param [String] uri amqp or amqps URI to parse
+ # @return [Hash] Parsed URI as a hash
+ def self.parse_uri(uri)
+ AMQ::Settings.parse_amqp_url(uri)
+ end
+
+ #
# Implementation
#
# @private
def open_channel(ch)
@@ -374,11 +416,11 @@
end
# @private
def close_all_channels
@channels.reject {|n, ch| n == 0 || !ch.open? }.each do |_, ch|
- Bunny::Timer.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close }
+ Bunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close }
end
end
# @private
def close_connection(sync = true)
@@ -633,10 +675,10 @@
# joining the thread here may take forever
# on JRuby because sun.nio.ch.KQueueArrayWrapper#kevent0 is
# a native method that cannot be (easily) interrupted.
# So we use this ugly hack or else our test suite takes forever
# to run on JRuby (a new connection is opened/closed per example). MK.
- if RUBY_ENGINE == "jruby"
+ if defined?(JRUBY_VERSION)
sleep 0.075
else
@reader_loop.join
end
else