lib/amqp/client/connection.rb in amqp-client-1.1.4 vs lib/amqp/client/connection.rb in amqp-client-1.1.5
- old
+ new
@@ -46,10 +46,13 @@
@channels = {}
@closed = nil
@replies =
@write_lock =
@blocked = nil
+ @on_blocked = ->(reason) { warn "AMQP-Client blocked by broker: #{reason}" }
+ @on_unblocked = -> { warn "AMQP-Client unblocked by broker" }
+ { read_loop } if read_loop_thread
# Alias for {#initialize}
# @see #initialize
@@ -123,24 +126,37 @@
# @return [Boolean]
def closed?
+ # @!group Callbacks
+ # Callback called when client is blocked by the broker
+ # @yield [String] reason to why the connection is being blocked
+ # @return [nil]
+ def on_blocked(&blk)
+ @on_blocked = blk
+ nil
+ end
+ # Callback called when client is unblocked by the broker
+ # @yield
+ # @return [nil]
+ def on_unblocked(&blk)
+ @on_unblocked = blk
+ nil
+ end
+ # @!endgroup
# Write byte array(s) directly to the socket (thread-safe)
# @param bytes [String] One or more byte arrays
# @return [Integer] number of bytes written
# @api private
def write_bytes(*bytes)
- blocked = @blocked
- warn "AMQP-Client blocked by broker: #{blocked}" if blocked
@write_lock.synchronize do
- warn "AMQP-Client unblocked by broker" if blocked
- if RUBY_ENGINE == "truffleruby"
- bytes.each { |b| @socket.write b }
- else
- @socket.write(*bytes)
- end
+ @socket.write(*bytes)
rescue *READ_EXCEPTIONS => e
raise*@closed) if @closed
raise Error, "Could not write to socket, #{e.message}"
@@ -164,11 +180,11 @@
# read the frame content, frame_buffer) || raise(IOError)
# make sure that the frame end is correct
frame_end = socket.readchar.ord
- raise UnexpectedFrameEnd, frame_end if frame_end != 206
+ raise Error::UnexpectedFrameEnd, frame_end if frame_end != 206
# parse the frame, will return false if a close frame was received
parse_frame(type, channel_id, frame_buffer) || return
@@ -177,12 +193,16 @@
nil # ignore read errors
@closed ||= [400, "unknown"]
- @write_lock.synchronize do
+ if @write_lock.owned? # if connection is blocked
+ else
+ @write_lock.synchronize do
+ @socket.close
+ end
@@ -190,11 +210,12 @@
READ_EXCEPTIONS = [IOError, OpenSSL::OpenSSLError, SystemCallError,
RUBY_ENGINE == "jruby" ? java.lang.NullPointerException : nil].compact.freeze
- def parse_frame(type, channel_id, buf)
+ def parse_frame(type, channel_id, buf) # rubocop:disable Metrics/MethodLength
+ channel = @channels[channel_id]
case type
when 1 # method frame
class_id, method_id = buf.unpack("S> S>")
case class_id
when 10 # connection
@@ -219,19 +240,21 @@
when 60 # connection#blocked
reason_len = buf.getbyte(4)
reason = buf.byteslice(5, reason_len).force_encoding("utf-8")
@blocked = reason
when 61 # connection#unblocked
- @blocked = nil
+ @blocked = nil
else raise Error::UnsupportedMethodFrame, class_id, method_id
when 20 # channel
case method_id
when 11 # channel#open-ok
- @channels[channel_id].reply [:channel_open_ok]
+ channel.reply [:channel_open_ok]
when 40 # channel#close
reply_code, reply_text_len = buf.unpack("@4 S> C")
reply_text = buf.byteslice(7, reply_text_len).force_encoding("utf-8")
classid, methodid = buf.byteslice(7 + reply_text_len, 4).unpack("S> S>")
channel = @channels.delete(channel_id)
@@ -243,55 +266,56 @@
else raise Error::UnsupportedMethodFrame, class_id, method_id
when 40 # exchange
case method_id
when 11 # declare-ok
- @channels[channel_id].reply [:exchange_declare_ok]
+ channel.reply [:exchange_declare_ok]
when 21 # delete-ok
- @channels[channel_id].reply [:exchange_delete_ok]
+ channel.reply [:exchange_delete_ok]
when 31 # bind-ok
- @channels[channel_id].reply [:exchange_bind_ok]
+ channel.reply [:exchange_bind_ok]
when 51 # unbind-ok
- @channels[channel_id].reply [:exchange_unbind_ok]
+ channel.reply [:exchange_unbind_ok]
else raise Error::UnsupportedMethodFrame, class_id, method_id
when 50 # queue
case method_id
when 11 # declare-ok
queue_name_len = buf.getbyte(4)
queue_name = buf.byteslice(5, queue_name_len).force_encoding("utf-8")
message_count, consumer_count = buf.byteslice(5 + queue_name_len, 8).unpack("L> L>")
- @channels[channel_id].reply [:queue_declare_ok, queue_name, message_count, consumer_count]
+ channel.reply [:queue_declare_ok, queue_name, message_count, consumer_count]
when 21 # bind-ok
- @channels[channel_id].reply [:queue_bind_ok]
+ channel.reply [:queue_bind_ok]
when 31 # purge-ok
- @channels[channel_id].reply [:queue_purge_ok]
+ message_count = buf.unpack1("@4 L>")
+ channel.reply [:queue_purge_ok, message_count]
when 41 # delete-ok
message_count = buf.unpack1("@4 L>")
- @channels[channel_id].reply [:queue_delete, message_count]
+ channel.reply [:queue_delete, message_count]
when 51 # unbind-ok
- @channels[channel_id].reply [:queue_unbind_ok]
+ channel.reply [:queue_unbind_ok]
else raise class_id, method_id
when 60 # basic
case method_id
when 11 # qos-ok
- @channels[channel_id].reply [:basic_qos_ok]
+ channel.reply [:basic_qos_ok]
when 21 # consume-ok
tag_len = buf.getbyte(4)
tag = buf.byteslice(5, tag_len).force_encoding("utf-8")
- @channels[channel_id].reply [:basic_consume_ok, tag]
+ channel.reply [:basic_consume_ok, tag]
when 30 # cancel
tag_len = buf.getbyte(4)
tag = buf.byteslice(5, tag_len).force_encoding("utf-8")
no_wait = buf.getbyte(5 + tag_len) == 1
- @channels[channel_id].close_consumer(tag)
+ channel.close_consumer(tag)
write_bytes FrameBytes.basic_cancel_ok(@id, tag) unless no_wait
when 31 # cancel-ok
tag_len = buf.getbyte(4)
tag = buf.byteslice(5, tag_len).force_encoding("utf-8")
- @channels[channel_id].reply [:basic_cancel_ok, tag]
+ channel.reply [:basic_cancel_ok, tag]
when 50 # return
reply_code, reply_text_len = buf.unpack("@4 S> C")
pos = 7
reply_text = buf.byteslice(pos, reply_text_len).force_encoding("utf-8")
pos += reply_text_len
@@ -300,11 +324,11 @@
exchange = buf.byteslice(pos, exchange_len).force_encoding("utf-8")
pos += exchange_len
routing_key_len = buf.getbyte(pos)
pos += 1
routing_key = buf.byteslice(pos, routing_key_len).force_encoding("utf-8")
- @channels[channel_id].message_returned(reply_code, reply_text, exchange, routing_key)
+ channel.message_returned(reply_code, reply_text, exchange, routing_key)
when 60 # deliver
ctag_len = buf.getbyte(4)
consumer_tag = buf.byteslice(5, ctag_len).force_encoding("utf-8")
pos = 5 + ctag_len
delivery_tag, redelivered, exchange_len = buf.byteslice(pos, 10).unpack("Q> C C")
@@ -312,58 +336,58 @@
exchange = buf.byteslice(pos, exchange_len).force_encoding("utf-8")
pos += exchange_len
rk_len = buf.getbyte(pos)
pos += 1
routing_key = buf.byteslice(pos, rk_len).force_encoding("utf-8")
- @channels[channel_id].message_delivered(consumer_tag, delivery_tag, redelivered == 1, exchange, routing_key)
+ channel.message_delivered(consumer_tag, delivery_tag, redelivered == 1, exchange, routing_key)
when 71 # get-ok
delivery_tag, redelivered, exchange_len = buf.unpack("@4 Q> C C")
pos = 14
exchange = buf.byteslice(pos, exchange_len).force_encoding("utf-8")
pos += exchange_len
routing_key_len = buf.getbyte(pos)
pos += 1
routing_key = buf.byteslice(pos, routing_key_len).force_encoding("utf-8")
# pos += routing_key_len
# message_count = buf.byteslice(pos, 4).unpack1("L>")
- @channels[channel_id].message_delivered(nil, delivery_tag, redelivered == 1, exchange, routing_key)
+ channel.message_delivered(nil, delivery_tag, redelivered == 1, exchange, routing_key)
when 72 # get-empty
- @channels[channel_id].basic_get_empty
+ channel.basic_get_empty
when 80 # ack
delivery_tag, multiple = buf.unpack("@4 Q> C")
- @channels[channel_id].confirm [:ack, delivery_tag, multiple == 1]
+ channel.confirm [:ack, delivery_tag, multiple == 1]
when 111 # recover-ok
- @channels[channel_id].reply [:basic_recover_ok]
+ channel.reply [:basic_recover_ok]
when 120 # nack
delivery_tag, multiple, requeue = buf.unpack("@4 Q> C C")
- @channels[channel_id].confirm [:nack, delivery_tag, multiple == 1, requeue == 1]
+ channel.confirm [:nack, delivery_tag, multiple == 1, requeue == 1]
else raise class_id, method_id
when 85 # confirm
case method_id
when 11 # select-ok
- @channels[channel_id].reply [:confirm_select_ok]
+ channel.reply [:confirm_select_ok]
else raise class_id, method_id
when 90 # tx
case method_id
when 11 # select-ok
- @channels[channel_id].reply [:tx_select_ok]
+ channel.reply [:tx_select_ok]
when 21 # commit-ok
- @channels[channel_id].reply [:tx_commit_ok]
+ channel.reply [:tx_commit_ok]
when 31 # rollback-ok
- @channels[channel_id].reply [:tx_rollback_ok]
+ channel.reply [:tx_rollback_ok]
else raise class_id, method_id
else raise class_id, method_id
when 2 # header
body_size = buf.unpack1("@4 Q>")
properties = Properties.decode(buf, 12)
- @channels[channel_id].header_delivered body_size, properties
+ channel.header_delivered body_size, properties
when 3 # body
- @channels[channel_id].body_delivered buf
+ channel.body_delivered buf
else raise Error::UnsupportedFrameType, type
@@ -417,10 +441,10 @@
raise Error, "Could not establish AMQP connection: #{e.message}"
type, channel_id, frame_size = buf.unpack("C S> L>")
frame_end = buf.getbyte(frame_size + 7)
- raise UnexpectedFrameEndError, frame_end if frame_end != 206
+ raise Error::UnexpectedFrameEnd, frame_end if frame_end != 206
case type
when 1 # method frame
class_id, method_id = buf.unpack("@7 S> S>")
case class_id