lib/bunny/session.rb in bunny-0.10.8 vs lib/bunny/session.rb in bunny-1.0.0.pre1
- old
+ new
@@ -1,8 +1,7 @@
require "socket"
require "thread"
-require "monitor"
require "bunny/transport"
require "bunny/channel_id_allocator"
require "bunny/heartbeat_sender"
require "bunny/reader_loop"
@@ -95,27 +94,22 @@
# @option connection_string_or_opts [Integer] :port (5672) Port RabbitMQ listens on
# @option connection_string_or_opts [String] :username ("guest") Username
# @option connection_string_or_opts [String] :password ("guest") Password
# @option connection_string_or_opts [String] :vhost ("/") Virtual host to use
# @option connection_string_or_opts [Integer] :heartbeat (600) Heartbeat interval. 0 means no heartbeat.
- # @option connection_string_or_opts [Boolean] :tls (false) Should TLS/SSL be used?
- # @option connection_string_or_opts [String] :tls_cert (nil) Path to client TLS/SSL certificate file (.pem)
- # @option connection_string_or_opts [String] :tls_key (nil) Path to client TLS/SSL private key file (.pem)
- # @option connection_string_or_opts [Array<String>] :tls_ca_certificates Array of paths to TLS/SSL CA files (.pem), by default detected from OpenSSL configuration
#
# @option optz [String] :auth_mechanism ("PLAIN") Authentication mechanism, PLAIN or EXTERNAL
# @option optz [String] :locale ("PLAIN") Locale RabbitMQ should use
#
# @see http://rubybunny.info/articles/connecting.html Connecting to RabbitMQ guide
- # @see http://rubybunny.info/articles/tls.html TLS/SSL guide
# @api public
def initialize(connection_string_or_opts = Hash.new, optz = Hash.new)
opts = case (ENV["RABBITMQ_URL"] || connection_string_or_opts)
when nil then
Hash.new
when String then
- self.class.parse_uri(ENV["RABBITMQ_URL"] || connection_string_or_opts)
+ AMQ::Settings.parse_amqp_url(connection_string_or_opts)
when Hash then
connection_string_or_opts
end.merge(optz)
@opts = opts
@@ -148,18 +142,15 @@
@client_properties = opts[:properties] || DEFAULT_CLIENT_PROPERTIES
@mechanism = opts.fetch(:auth_mechanism, "PLAIN")
@credentials_encoder = credentials_encoder_for(@mechanism)
@locale = @opts.fetch(:locale, DEFAULT_LOCALE)
-
- @mutex_impl = @opts.fetch(:mutex_impl, Monitor)
-
# mutex for the channel id => channel hash
- @channel_mutex = @mutex_impl.new
+ @channel_mutex = Mutex.new
# transport operations/continuations mutex. A workaround for
# the non-reentrant Ruby mutexes. MK.
- @transport_mutex = @mutex_impl.new
+ @transport_mutex = Mutex.new
@channels = Hash.new
@origin_thread = Thread.current
self.reset_continuations
@@ -193,15 +184,10 @@
# @return [Boolean] true if this connection uses a separate thread for I/O activity
def threaded?
@threaded
end
- # @private
- attr_reader :mutex_impl
-
- # Provides a way to fine tune the socket used by connection.
- # Accepts a block that the socket will be yielded to.
def configure_socket(&block)
raise ArgumentError, "No block provided!" if block.nil?
@transport.configure_socket(&block)
end
@@ -231,11 +217,11 @@
self.init_connection
self.open_connection
@reader_loop = nil
- self.start_reader_loop if threaded?
+ self.start_reader_loop if @threaded
@default_channel = self.create_channel
rescue Exception => e
@status = :not_connected
raise e
@@ -270,18 +256,13 @@
# Closes the connection. This involves closing all of its channels.
def close
if @transport.open?
close_all_channels
- Bunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) do
- self.close_connection(true)
+ Bunny::Timer.timeout(@transport.disconnect_timeout, ClientTimeout) do
+ self.close_connection(false)
end
-
- maybe_shutdown_reader_loop
- close_transport
-
- @status = :closed
end
end
alias stop close
# Creates a temporary channel, yields it to the block given to this
@@ -347,47 +328,12 @@
# @private
def exchange(*args)
@default_channel.exchange(*args)
end
- # Defines a callback that will be executed when RabbitMQ blocks the connection
- # because it is running low on memory or disk space (as configured via config file
- # and/or rabbitmqctl).
- #
- # @yield [AMQ::Protocol::Connection::Blocked] connection.blocked method which provides a reason for blocking
- #
- # @api public
- def on_blocked(&block)
- @block_callback = block
- end
- # Defines a callback that will be executed when RabbitMQ unblocks the connection
- # that was previously blocked, e.g. because the memory or disk space alarm has cleared.
#
- # @see #on_blocked
- # @api public
- def on_unblocked(&block)
- @unblock_callback = block
- end
-
- # @return [Boolean] true if the connection is currently blocked by RabbitMQ because it's running low on
- # RAM, disk space, or other resource; false otherwise
- # @see #on_blocked
- # @see #on_unblocked
- def blocked?
- @blocked
- end
-
- # Parses an amqp[s] URI into a hash that {Bunny::Session#initialize} accepts.
- #
- # @param [String] uri amqp or amqps URI to parse
- # @return [Hash] Parsed URI as a hash
- def self.parse_uri(uri)
- AMQ::Settings.parse_amqp_url(uri)
- end
-
- #
# Implementation
#
# @private
def open_channel(ch)
@@ -416,25 +362,23 @@
end
# @private
def close_all_channels
@channels.reject {|n, ch| n == 0 || !ch.open? }.each do |_, ch|
- Bunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close }
+ Bunny::Timer.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close }
end
end
# @private
def close_connection(sync = true)
- if @transport.open?
- @transport.send_frame(AMQ::Protocol::Connection::Close.encode(200, "Goodbye", 0, 0))
+ @transport.send_frame(AMQ::Protocol::Connection::Close.encode(200, "Goodbye", 0, 0))
- maybe_shutdown_heartbeat_sender
- @status = :not_connected
+ maybe_shutdown_heartbeat_sender
+ @status = :not_connected
- if sync
- @last_connection_close_ok = wait_on_continuations
- end
+ if sync
+ @last_connection_close_ok = wait_on_continuations
end
end
# @private
def handle_frame(ch_number, method)
@@ -446,15 +390,20 @@
@continuations.push(method)
when AMQ::Protocol::Connection::Close then
@last_connection_error = instantiate_connection_level_exception(method)
@continuations.push(method)
- @origin_thread.raise(@last_connection_error)
+ raise @last_connection_error
when AMQ::Protocol::Connection::CloseOk then
@last_connection_close_ok = method
begin
@continuations.clear
+
+ reader_loop.stop
+ @reader_loop = nil
+
+ @transport.close
rescue StandardError => e
@logger.error e.class.name
@logger.error e.message
@logger.error e.backtrace
ensure
@@ -664,43 +613,10 @@
@reader_loop ||= ReaderLoop.new(@transport, self, Thread.current)
end
# @private
def maybe_shutdown_reader_loop
- if @reader_loop
- @reader_loop.stop
- if threaded?
- # this is the easiest way to wait until the loop
- # is guaranteed to have terminated
- @reader_loop.raise(ShutdownSignal)
- # joining the thread here may take forever
- # on JRuby because sun.nio.ch.KQueueArrayWrapper#kevent0 is
- # a native method that cannot be (easily) interrupted.
- # So we use this ugly hack or else our test suite takes forever
- # to run on JRuby (a new connection is opened/closed per example). MK.
- if defined?(JRUBY_VERSION)
- sleep 0.075
- else
- @reader_loop.join
- end
- else
- # single threaded mode, nothing to do. MK.
- end
- end
-
- @reader_loop = nil
- end
-
- # @private
- def close_transport
- begin
- @transport.close
- rescue StandardError => e
- @logger.error "Exception when closing transport:"
- @logger.error e.class.name
- @logger.error e.message
- @logger.error e.backtrace
- end
+ @reader_loop.stop if @reader_loop
end
# @private
def signal_activity!
@heartbeat_sender.signal_activity! if @heartbeat_sender