lib/bunny/session.rb in bunny-1.0.0.pre3 vs lib/bunny/session.rb in bunny-1.0.0.pre4
- old
+ new
@@ -52,11 +52,12 @@
DEFAULT_CLIENT_PROPERTIES = {
:capabilities => {
:publisher_confirms => true,
:consumer_cancel_notify => true,
:exchange_exchange_bindings => true,
- :"basic.nack" => true
+ :"basic.nack" => true,
+ :"connection.blocked" => true
},
:product => "Bunny",
:platform => ::RUBY_DESCRIPTION,
:version => Bunny::VERSION,
:information => "http://rubybunny.info",
@@ -133,10 +134,11 @@
@network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
# in ms
@continuation_timeout = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT)
@status = :not_connected
+ @blocked = false
# these are negotiated with the broker during the connection tuning phase
@client_frame_max = opts.fetch(:frame_max, DEFAULT_FRAME_MAX)
@client_channel_max = opts.fetch(:channel_max, 65536)
@client_heartbeat = self.heartbeat_from(opts)
@@ -206,10 +208,13 @@
# @api public
def start
return self if connected?
@status = :connecting
+ # reset here for cases when automatic network recovery kicks in
+ # when we were blocked. MK.
+ @blocked = false
self.reset_continuations
begin
# close existing transport if we have one,
# to not leak sockets
@@ -224,11 +229,11 @@
self.init_connection
self.open_connection
@reader_loop = nil
- self.start_reader_loop if @threaded
+ self.start_reader_loop if threaded?
@default_channel = self.create_channel
rescue Exception => e
@status = :not_connected
raise e
@@ -340,12 +345,40 @@
# @private
def exchange(*args)
@default_channel.exchange(*args)
end
+ # Defines a callback that will be executed when RabbitMQ blocks the connection
+ # because it is running low on memory or disk space (as configured via config file
+ # and/or rabbitmqctl).
+ #
+ # @yield [AMQ::Protocol::Connection::Blocked] connection.blocked method which provides a reason for blocking
+ #
+ # @api public
+ def on_blocked(&block)
+ @block_callback = block
+ end
+ # Defines a callback that will be executed when RabbitMQ unblocks the connection
+ # that was previously blocked, e.g. because the memory or disk space alarm has cleared.
#
+ # @see #on_blocked
+ # @api public
+ def on_unblocked(&block)
+ @unblock_callback = block
+ end
+
+ # @return [Boolean] true if the connection is currently blocked by RabbitMQ because it's running low on
+ # RAM, disk space, or other resource; false otherwise
+ # @see #on_blocked
+ # @see #on_unblocked
+ def blocked?
+ @blocked
+ end
+
+
+ #
# Implementation
#
# @private
def open_channel(ch)
@@ -416,10 +449,16 @@
@logger.error e.message
@logger.error e.backtrace
ensure
@continuations.push(:__unblock__)
end
+ when AMQ::Protocol::Connection::Blocked then
+ @blocked = true
+ @block_callback.call(method) if @block_callback
+ when AMQ::Protocol::Connection::Unblocked then
+ @blocked = false
+ @unblock_callback.call(method) if @unblock_callback
when AMQ::Protocol::Channel::Close then
begin
ch = @channels[ch_number]
ch.handle_method(method)
ensure
@@ -624,13 +663,26 @@
# @private
def maybe_shutdown_reader_loop
if @reader_loop
@reader_loop.stop
- # We don't need to kill the loop but
- # this is the easiest way to wait until the loop
- # is guaranteed to have terminated
- @reader_loop.kill
+ if threaded?
+ # this is the easiest way to wait until the loop
+ # is guaranteed to have terminated
+ @reader_loop.raise(ShutdownSignal)
+ # joining the thread here may take forever
+ # on JRuby because sun.nio.ch.KQueueArrayWrapper#kevent0 is
+ # a native method that cannot be (easily) interrupted.
+ # So we use this ugly hack or else our test suite takes forever
+ # to run on JRuby (a new connection is opened/closed per example). MK.
+ if RUBY_ENGINE == "jruby"
+ sleep 0.075
+ else
+ @reader_loop.join
+ end
+ else
+ # single threaded mode, nothing to do. MK.
+ end
end
@reader_loop = nil
end