lib/bunny/session.rb in bunny-0.10.8 vs lib/bunny/session.rb in bunny-1.0.0.pre1

- old
+ new

@@ -1,8 +1,7 @@ require "socket" require "thread" -require "monitor" require "bunny/transport" require "bunny/channel_id_allocator" require "bunny/heartbeat_sender" require "bunny/reader_loop" @@ -95,27 +94,22 @@ # @option connection_string_or_opts [Integer] :port (5672) Port RabbitMQ listens on # @option connection_string_or_opts [String] :username ("guest") Username # @option connection_string_or_opts [String] :password ("guest") Password # @option connection_string_or_opts [String] :vhost ("/") Virtual host to use # @option connection_string_or_opts [Integer] :heartbeat (600) Heartbeat interval. 0 means no heartbeat. - # @option connection_string_or_opts [Boolean] :tls (false) Should TLS/SSL be used? - # @option connection_string_or_opts [String] :tls_cert (nil) Path to client TLS/SSL certificate file (.pem) - # @option connection_string_or_opts [String] :tls_key (nil) Path to client TLS/SSL private key file (.pem) - # @option connection_string_or_opts [Array<String>] :tls_ca_certificates Array of paths to TLS/SSL CA files (.pem), by default detected from OpenSSL configuration # # @option optz [String] :auth_mechanism ("PLAIN") Authentication mechanism, PLAIN or EXTERNAL # @option optz [String] :locale ("PLAIN") Locale RabbitMQ should use # # @see http://rubybunny.info/articles/connecting.html Connecting to RabbitMQ guide - # @see http://rubybunny.info/articles/tls.html TLS/SSL guide # @api public def initialize(connection_string_or_opts = Hash.new, optz = Hash.new) opts = case (ENV["RABBITMQ_URL"] || connection_string_or_opts) when nil then Hash.new when String then - self.class.parse_uri(ENV["RABBITMQ_URL"] || connection_string_or_opts) + AMQ::Settings.parse_amqp_url(connection_string_or_opts) when Hash then connection_string_or_opts end.merge(optz) @opts = opts @@ -148,18 +142,15 @@ @client_properties = opts[:properties] || DEFAULT_CLIENT_PROPERTIES @mechanism = opts.fetch(:auth_mechanism, "PLAIN") @credentials_encoder = credentials_encoder_for(@mechanism) @locale = @opts.fetch(:locale, DEFAULT_LOCALE) - - @mutex_impl = @opts.fetch(:mutex_impl, Monitor) - # mutex for the channel id => channel hash - @channel_mutex = @mutex_impl.new + @channel_mutex = Mutex.new # transport operations/continuations mutex. A workaround for # the non-reentrant Ruby mutexes. MK. - @transport_mutex = @mutex_impl.new + @transport_mutex = Mutex.new @channels = Hash.new @origin_thread = Thread.current self.reset_continuations @@ -193,15 +184,10 @@ # @return [Boolean] true if this connection uses a separate thread for I/O activity def threaded? @threaded end - # @private - attr_reader :mutex_impl - - # Provides a way to fine tune the socket used by connection. - # Accepts a block that the socket will be yielded to. def configure_socket(&block) raise ArgumentError, "No block provided!" if block.nil? @transport.configure_socket(&block) end @@ -231,11 +217,11 @@ self.init_connection self.open_connection @reader_loop = nil - self.start_reader_loop if threaded? + self.start_reader_loop if @threaded @default_channel = self.create_channel rescue Exception => e @status = :not_connected raise e @@ -270,18 +256,13 @@ # Closes the connection. This involves closing all of its channels. def close if @transport.open? close_all_channels - Bunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) do - self.close_connection(true) + Bunny::Timer.timeout(@transport.disconnect_timeout, ClientTimeout) do + self.close_connection(false) end - - maybe_shutdown_reader_loop - close_transport - - @status = :closed end end alias stop close # Creates a temporary channel, yields it to the block given to this @@ -347,47 +328,12 @@ # @private def exchange(*args) @default_channel.exchange(*args) end - # Defines a callback that will be executed when RabbitMQ blocks the connection - # because it is running low on memory or disk space (as configured via config file - # and/or rabbitmqctl). - # - # @yield [AMQ::Protocol::Connection::Blocked] connection.blocked method which provides a reason for blocking - # - # @api public - def on_blocked(&block) - @block_callback = block - end - # Defines a callback that will be executed when RabbitMQ unblocks the connection - # that was previously blocked, e.g. because the memory or disk space alarm has cleared. # - # @see #on_blocked - # @api public - def on_unblocked(&block) - @unblock_callback = block - end - - # @return [Boolean] true if the connection is currently blocked by RabbitMQ because it's running low on - # RAM, disk space, or other resource; false otherwise - # @see #on_blocked - # @see #on_unblocked - def blocked? - @blocked - end - - # Parses an amqp[s] URI into a hash that {Bunny::Session#initialize} accepts. - # - # @param [String] uri amqp or amqps URI to parse - # @return [Hash] Parsed URI as a hash - def self.parse_uri(uri) - AMQ::Settings.parse_amqp_url(uri) - end - - # # Implementation # # @private def open_channel(ch) @@ -416,25 +362,23 @@ end # @private def close_all_channels @channels.reject {|n, ch| n == 0 || !ch.open? }.each do |_, ch| - Bunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close } + Bunny::Timer.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close } end end # @private def close_connection(sync = true) - if @transport.open? - @transport.send_frame(AMQ::Protocol::Connection::Close.encode(200, "Goodbye", 0, 0)) + @transport.send_frame(AMQ::Protocol::Connection::Close.encode(200, "Goodbye", 0, 0)) - maybe_shutdown_heartbeat_sender - @status = :not_connected + maybe_shutdown_heartbeat_sender + @status = :not_connected - if sync - @last_connection_close_ok = wait_on_continuations - end + if sync + @last_connection_close_ok = wait_on_continuations end end # @private def handle_frame(ch_number, method) @@ -446,15 +390,20 @@ @continuations.push(method) when AMQ::Protocol::Connection::Close then @last_connection_error = instantiate_connection_level_exception(method) @continuations.push(method) - @origin_thread.raise(@last_connection_error) + raise @last_connection_error when AMQ::Protocol::Connection::CloseOk then @last_connection_close_ok = method begin @continuations.clear + + reader_loop.stop + @reader_loop = nil + + @transport.close rescue StandardError => e @logger.error e.class.name @logger.error e.message @logger.error e.backtrace ensure @@ -664,43 +613,10 @@ @reader_loop ||= ReaderLoop.new(@transport, self, Thread.current) end # @private def maybe_shutdown_reader_loop - if @reader_loop - @reader_loop.stop - if threaded? - # this is the easiest way to wait until the loop - # is guaranteed to have terminated - @reader_loop.raise(ShutdownSignal) - # joining the thread here may take forever - # on JRuby because sun.nio.ch.KQueueArrayWrapper#kevent0 is - # a native method that cannot be (easily) interrupted. - # So we use this ugly hack or else our test suite takes forever - # to run on JRuby (a new connection is opened/closed per example). MK. - if defined?(JRUBY_VERSION) - sleep 0.075 - else - @reader_loop.join - end - else - # single threaded mode, nothing to do. MK. - end - end - - @reader_loop = nil - end - - # @private - def close_transport - begin - @transport.close - rescue StandardError => e - @logger.error "Exception when closing transport:" - @logger.error e.class.name - @logger.error e.message - @logger.error e.backtrace - end + @reader_loop.stop if @reader_loop end # @private def signal_activity! @heartbeat_sender.signal_activity! if @heartbeat_sender