lib/amqp/client/connection.rb in amqp-client-1.1.4 vs lib/amqp/client/connection.rb in amqp-client-1.1.5

- old
+ new

@@ -46,10 +46,13 @@ @channels = {} @closed = nil @replies = ::Queue.new @write_lock = Mutex.new @blocked = nil + @on_blocked = ->(reason) { warn "AMQP-Client blocked by broker: #{reason}" } + @on_unblocked = -> { warn "AMQP-Client unblocked by broker" } + Thread.new { read_loop } if read_loop_thread end # Alias for {#initialize} # @see #initialize @@ -123,24 +126,37 @@ # @return [Boolean] def closed? !@closed.nil? end + # @!group Callbacks + + # Callback called when client is blocked by the broker + # @yield [String] reason to why the connection is being blocked + # @return [nil] + def on_blocked(&blk) + @on_blocked = blk + nil + end + + # Callback called when client is unblocked by the broker + # @yield + # @return [nil] + def on_unblocked(&blk) + @on_unblocked = blk + nil + end + + # @!endgroup + # Write byte array(s) directly to the socket (thread-safe) # @param bytes [String] One or more byte arrays # @return [Integer] number of bytes written # @api private def write_bytes(*bytes) - blocked = @blocked - warn "AMQP-Client blocked by broker: #{blocked}" if blocked @write_lock.synchronize do - warn "AMQP-Client unblocked by broker" if blocked - if RUBY_ENGINE == "truffleruby" - bytes.each { |b| @socket.write b } - else - @socket.write(*bytes) - end + @socket.write(*bytes) end rescue *READ_EXCEPTIONS => e raise Error::ConnectionClosed.new(*@closed) if @closed raise Error, "Could not write to socket, #{e.message}" @@ -164,11 +180,11 @@ # read the frame content socket.read(frame_size, frame_buffer) || raise(IOError) # make sure that the frame end is correct frame_end = socket.readchar.ord - raise UnexpectedFrameEnd, frame_end if frame_end != 206 + raise Error::UnexpectedFrameEnd, frame_end if frame_end != 206 # parse the frame, will return false if a close frame was received parse_frame(type, channel_id, frame_buffer) || return end nil @@ -177,12 +193,16 @@ nil # ignore read errors ensure @closed ||= [400, "unknown"] @replies.close begin - @write_lock.synchronize do + if @write_lock.owned? # if connection is blocked @socket.close + else + @write_lock.synchronize do + @socket.close + end end rescue *READ_EXCEPTIONS nil end end @@ -190,11 +210,12 @@ private READ_EXCEPTIONS = [IOError, OpenSSL::OpenSSLError, SystemCallError, RUBY_ENGINE == "jruby" ? java.lang.NullPointerException : nil].compact.freeze - def parse_frame(type, channel_id, buf) + def parse_frame(type, channel_id, buf) # rubocop:disable Metrics/MethodLength + channel = @channels[channel_id] case type when 1 # method frame class_id, method_id = buf.unpack("S> S>") case class_id when 10 # connection @@ -219,19 +240,21 @@ when 60 # connection#blocked reason_len = buf.getbyte(4) reason = buf.byteslice(5, reason_len).force_encoding("utf-8") @blocked = reason @write_lock.lock + @on_blocked.call(reason) when 61 # connection#unblocked - @blocked = nil @write_lock.unlock + @blocked = nil + @on_unblocked.call else raise Error::UnsupportedMethodFrame, class_id, method_id end when 20 # channel case method_id when 11 # channel#open-ok - @channels[channel_id].reply [:channel_open_ok] + channel.reply [:channel_open_ok] when 40 # channel#close reply_code, reply_text_len = buf.unpack("@4 S> C") reply_text = buf.byteslice(7, reply_text_len).force_encoding("utf-8") classid, methodid = buf.byteslice(7 + reply_text_len, 4).unpack("S> S>") channel = @channels.delete(channel_id) @@ -243,55 +266,56 @@ else raise Error::UnsupportedMethodFrame, class_id, method_id end when 40 # exchange case method_id when 11 # declare-ok - @channels[channel_id].reply [:exchange_declare_ok] + channel.reply [:exchange_declare_ok] when 21 # delete-ok - @channels[channel_id].reply [:exchange_delete_ok] + channel.reply [:exchange_delete_ok] when 31 # bind-ok - @channels[channel_id].reply [:exchange_bind_ok] + channel.reply [:exchange_bind_ok] when 51 # unbind-ok - @channels[channel_id].reply [:exchange_unbind_ok] + channel.reply [:exchange_unbind_ok] else raise Error::UnsupportedMethodFrame, class_id, method_id end when 50 # queue case method_id when 11 # declare-ok queue_name_len = buf.getbyte(4) queue_name = buf.byteslice(5, queue_name_len).force_encoding("utf-8") message_count, consumer_count = buf.byteslice(5 + queue_name_len, 8).unpack("L> L>") - @channels[channel_id].reply [:queue_declare_ok, queue_name, message_count, consumer_count] + channel.reply [:queue_declare_ok, queue_name, message_count, consumer_count] when 21 # bind-ok - @channels[channel_id].reply [:queue_bind_ok] + channel.reply [:queue_bind_ok] when 31 # purge-ok - @channels[channel_id].reply [:queue_purge_ok] + message_count = buf.unpack1("@4 L>") + channel.reply [:queue_purge_ok, message_count] when 41 # delete-ok message_count = buf.unpack1("@4 L>") - @channels[channel_id].reply [:queue_delete, message_count] + channel.reply [:queue_delete, message_count] when 51 # unbind-ok - @channels[channel_id].reply [:queue_unbind_ok] + channel.reply [:queue_unbind_ok] else raise Error::UnsupportedMethodFrame.new class_id, method_id end when 60 # basic case method_id when 11 # qos-ok - @channels[channel_id].reply [:basic_qos_ok] + channel.reply [:basic_qos_ok] when 21 # consume-ok tag_len = buf.getbyte(4) tag = buf.byteslice(5, tag_len).force_encoding("utf-8") - @channels[channel_id].reply [:basic_consume_ok, tag] + channel.reply [:basic_consume_ok, tag] when 30 # cancel tag_len = buf.getbyte(4) tag = buf.byteslice(5, tag_len).force_encoding("utf-8") no_wait = buf.getbyte(5 + tag_len) == 1 - @channels[channel_id].close_consumer(tag) + channel.close_consumer(tag) write_bytes FrameBytes.basic_cancel_ok(@id, tag) unless no_wait when 31 # cancel-ok tag_len = buf.getbyte(4) tag = buf.byteslice(5, tag_len).force_encoding("utf-8") - @channels[channel_id].reply [:basic_cancel_ok, tag] + channel.reply [:basic_cancel_ok, tag] when 50 # return reply_code, reply_text_len = buf.unpack("@4 S> C") pos = 7 reply_text = buf.byteslice(pos, reply_text_len).force_encoding("utf-8") pos += reply_text_len @@ -300,11 +324,11 @@ exchange = buf.byteslice(pos, exchange_len).force_encoding("utf-8") pos += exchange_len routing_key_len = buf.getbyte(pos) pos += 1 routing_key = buf.byteslice(pos, routing_key_len).force_encoding("utf-8") - @channels[channel_id].message_returned(reply_code, reply_text, exchange, routing_key) + channel.message_returned(reply_code, reply_text, exchange, routing_key) when 60 # deliver ctag_len = buf.getbyte(4) consumer_tag = buf.byteslice(5, ctag_len).force_encoding("utf-8") pos = 5 + ctag_len delivery_tag, redelivered, exchange_len = buf.byteslice(pos, 10).unpack("Q> C C") @@ -312,58 +336,58 @@ exchange = buf.byteslice(pos, exchange_len).force_encoding("utf-8") pos += exchange_len rk_len = buf.getbyte(pos) pos += 1 routing_key = buf.byteslice(pos, rk_len).force_encoding("utf-8") - @channels[channel_id].message_delivered(consumer_tag, delivery_tag, redelivered == 1, exchange, routing_key) + channel.message_delivered(consumer_tag, delivery_tag, redelivered == 1, exchange, routing_key) when 71 # get-ok delivery_tag, redelivered, exchange_len = buf.unpack("@4 Q> C C") pos = 14 exchange = buf.byteslice(pos, exchange_len).force_encoding("utf-8") pos += exchange_len routing_key_len = buf.getbyte(pos) pos += 1 routing_key = buf.byteslice(pos, routing_key_len).force_encoding("utf-8") # pos += routing_key_len # message_count = buf.byteslice(pos, 4).unpack1("L>") - @channels[channel_id].message_delivered(nil, delivery_tag, redelivered == 1, exchange, routing_key) + channel.message_delivered(nil, delivery_tag, redelivered == 1, exchange, routing_key) when 72 # get-empty - @channels[channel_id].basic_get_empty + channel.basic_get_empty when 80 # ack delivery_tag, multiple = buf.unpack("@4 Q> C") - @channels[channel_id].confirm [:ack, delivery_tag, multiple == 1] + channel.confirm [:ack, delivery_tag, multiple == 1] when 111 # recover-ok - @channels[channel_id].reply [:basic_recover_ok] + channel.reply [:basic_recover_ok] when 120 # nack delivery_tag, multiple, requeue = buf.unpack("@4 Q> C C") - @channels[channel_id].confirm [:nack, delivery_tag, multiple == 1, requeue == 1] + channel.confirm [:nack, delivery_tag, multiple == 1, requeue == 1] else raise Error::UnsupportedMethodFrame.new class_id, method_id end when 85 # confirm case method_id when 11 # select-ok - @channels[channel_id].reply [:confirm_select_ok] + channel.reply [:confirm_select_ok] else raise Error::UnsupportedMethodFrame.new class_id, method_id end when 90 # tx case method_id when 11 # select-ok - @channels[channel_id].reply [:tx_select_ok] + channel.reply [:tx_select_ok] when 21 # commit-ok - @channels[channel_id].reply [:tx_commit_ok] + channel.reply [:tx_commit_ok] when 31 # rollback-ok - @channels[channel_id].reply [:tx_rollback_ok] + channel.reply [:tx_rollback_ok] else raise Error::UnsupportedMethodFrame.new class_id, method_id end else raise Error::UnsupportedMethodFrame.new class_id, method_id end when 2 # header body_size = buf.unpack1("@4 Q>") properties = Properties.decode(buf, 12) - @channels[channel_id].header_delivered body_size, properties + channel.header_delivered body_size, properties when 3 # body - @channels[channel_id].body_delivered buf + channel.body_delivered buf else raise Error::UnsupportedFrameType, type end true end @@ -417,10 +441,10 @@ raise Error, "Could not establish AMQP connection: #{e.message}" end type, channel_id, frame_size = buf.unpack("C S> L>") frame_end = buf.getbyte(frame_size + 7) - raise UnexpectedFrameEndError, frame_end if frame_end != 206 + raise Error::UnexpectedFrameEnd, frame_end if frame_end != 206 case type when 1 # method frame class_id, method_id = buf.unpack("@7 S> S>") case class_id