lib/bunny/session.rb in bunny-1.0.0.pre3 vs lib/bunny/session.rb in bunny-1.0.0.pre4

- old
+ new

@@ -52,11 +52,12 @@ DEFAULT_CLIENT_PROPERTIES = { :capabilities => { :publisher_confirms => true, :consumer_cancel_notify => true, :exchange_exchange_bindings => true, - :"basic.nack" => true + :"basic.nack" => true, + :"connection.blocked" => true }, :product => "Bunny", :platform => ::RUBY_DESCRIPTION, :version => Bunny::VERSION, :information => "http://rubybunny.info", @@ -133,10 +134,11 @@ @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL) # in ms @continuation_timeout = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT) @status = :not_connected + @blocked = false # these are negotiated with the broker during the connection tuning phase @client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX) @client_channel_max = opts.fetch(:channel_max, 65536) @client_heartbeat = self.heartbeat_from(opts) @@ -206,10 +208,13 @@ # @api public def start return self if connected? @status = :connecting + # reset here for cases when automatic network recovery kicks in + # when we were blocked. MK. + @blocked = false self.reset_continuations begin # close existing transport if we have one, # to not leak sockets @@ -224,11 +229,11 @@ self.init_connection self.open_connection @reader_loop = nil - self.start_reader_loop if @threaded + self.start_reader_loop if threaded? @default_channel = self.create_channel rescue Exception => e @status = :not_connected raise e @@ -340,12 +345,40 @@ # @private def exchange(*args) @default_channel.exchange(*args) end + # Defines a callback that will be executed when RabbitMQ blocks the connection + # because it is running low on memory or disk space (as configured via config file + # and/or rabbitmqctl). + # + # @yield [AMQ::Protocol::Connection::Blocked] connection.blocked method which provides a reason for blocking + # + # @api public + def on_blocked(&block) + @block_callback = block + end + # Defines a callback that will be executed when RabbitMQ unblocks the connection + # that was previously blocked, e.g. because the memory or disk space alarm has cleared. # + # @see #on_blocked + # @api public + def on_unblocked(&block) + @unblock_callback = block + end + + # @return [Boolean] true if the connection is currently blocked by RabbitMQ because it's running low on + # RAM, disk space, or other resource; false otherwise + # @see #on_blocked + # @see #on_unblocked + def blocked? + @blocked + end + + + # # Implementation # # @private def open_channel(ch) @@ -416,10 +449,16 @@ @logger.error e.message @logger.error e.backtrace ensure @continuations.push(:__unblock__) end + when AMQ::Protocol::Connection::Blocked then + @blocked = true + @block_callback.call(method) if @block_callback + when AMQ::Protocol::Connection::Unblocked then + @blocked = false + @unblock_callback.call(method) if @unblock_callback when AMQ::Protocol::Channel::Close then begin ch = @channels[ch_number] ch.handle_method(method) ensure @@ -624,13 +663,26 @@ # @private def maybe_shutdown_reader_loop if @reader_loop @reader_loop.stop - # We don't need to kill the loop but - # this is the easiest way to wait until the loop - # is guaranteed to have terminated - @reader_loop.kill + if threaded? + # this is the easiest way to wait until the loop + # is guaranteed to have terminated + @reader_loop.raise(ShutdownSignal) + # joining the thread here may take forever + # on JRuby because sun.nio.ch.KQueueArrayWrapper#kevent0 is + # a native method that cannot be (easily) interrupted. + # So we use this ugly hack or else our test suite takes forever + # to run on JRuby (a new connection is opened/closed per example). MK. + if RUBY_ENGINE == "jruby" + sleep 0.075 + else + @reader_loop.join + end + else + # single threaded mode, nothing to do. MK. + end end @reader_loop = nil end