lib/amqp/client/connection.rb in amqp-client-1.1.5 vs lib/amqp/client/connection.rb in amqp-client-1.1.6
- old
+ new
@@ -52,10 +52,20 @@
@on_unblocked = -> { warn "AMQP-Client unblocked by broker" }
Thread.new { read_loop } if read_loop_thread
end
+ # Indicates that the server is blocking publishes.
+ # If the client keeps publishing the server will stop reading from the socket.
+ # Use the #on_blocked callback to get notified when the server is resource constrained.
+ # @see #on_blocked
+ # @see #on_unblocked
+ # @return [Bool]
+ def blocked?
+ !@blocked.nil?
+ end
+
# Alias for {#initialize}
# @see #initialize
# @deprecated
def self.connect(uri, read_loop_thread: true, **options)
new(uri, read_loop_thread: read_loop_thread, **options)
@@ -239,13 +249,11 @@
return false
when 60 # connection#blocked
reason_len = buf.getbyte(4)
reason = buf.byteslice(5, reason_len).force_encoding("utf-8")
@blocked = reason
- @write_lock.lock
@on_blocked.call(reason)
when 61 # connection#unblocked
- @write_lock.unlock
@blocked = nil
@on_unblocked.call
else raise Error::UnsupportedMethodFrame, class_id, method_id
end
when 20 # channel