lib/amqp/client/connection.rb in amqp-client-1.0.1 vs lib/amqp/client/connection.rb in amqp-client-1.0.2

- old
+ new

@@ -57,13 +57,14 @@ @socket = socket @channel_max = channel_max.zero? ? 65_536 : channel_max @frame_max = frame_max @heartbeat = heartbeat @channels = {} - @closed = false + @closed = nil @replies = ::Queue.new @write_lock = Mutex.new + @blocked = nil Thread.new { read_loop } if read_loop_thread end # The max frame size negotiated between the client and the broker # @return [Integer] @@ -113,32 +114,41 @@ # @param code [Integer] # @return [nil] def close(reason: "", code: 200) return if @closed - @closed = true - write_bytes FrameBytes.connection_close(code, reason) - @channels.each_value { |ch| ch.closed!(code, reason, 0, 0) } - expect(:close_ok) + @closed = [code, reason] + @channels.each_value { |ch| ch.closed!(:connection, code, reason, 0, 0) } + if @blocked + @socket.close + else + write_bytes FrameBytes.connection_close(code, reason) + expect(:close_ok) + end nil end # True if the connection is closed # @return [Boolean] def closed? - @closed + !@closed.nil? end # Write byte array(s) directly to the socket (thread-safe) # @param bytes [String] One or more byte arrays # @return [Integer] number of bytes written # @api private def write_bytes(*bytes) + blocked = @blocked + warn "AMQP-Client blocked by broker: #{blocked}" if blocked @write_lock.synchronize do + warn "AMQP-Client unblocked by broker" if blocked @socket.write(*bytes) end rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e + raise Error::ConnectionClosed.new(*@closed) if @closed + raise Error, "Could not write to socket, #{e.message}" end # Reads from the socket, required for any kind of progress. # Blocks until the connection is closed. Normally run as a background thread automatically. @@ -165,14 +175,14 @@ # parse the frame, will return false if a close frame was received parse_frame(type, channel_id, frame_buffer) || return end nil rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e - warn "AMQP-Client read error: #{e.inspect}" + @closed ||= [400, "read error: #{e.message}"] nil # ignore read errors ensure - @closed = true + @closed ||= [400, "unknown"] @replies.close begin @socket.close rescue IOError, OpenSSL::OpenSSLError, SystemCallError nil @@ -189,25 +199,32 @@ when 10 # connection raise Error, "Unexpected channel id #{channel_id} for Connection frame" if channel_id != 0 case method_id when 50 # connection#close - @closed = true code, text_len = buf.unpack("@4 S> C") text = buf.byteslice(7, text_len).force_encoding("utf-8") error_class_id, error_method_id = buf.byteslice(7 + text_len, 4).unpack("S> S>") - @channels.each_value { |ch| ch.closed!(code, text, error_class_id, error_method_id) } + @closed = [code, text, error_class_id, error_method_id] + @channels.each_value { |ch| ch.closed!(:connection, code, text, error_class_id, error_method_id) } begin write_bytes FrameBytes.connection_close_ok rescue Error nil # rabbitmq closes the socket after sending Connection::Close, so ignore write errors end return false when 51 # connection#close-ok - @closed = true @replies.push [:close_ok] return false + when 60 # connection#blocked + reason_len = buf.unpack1("@4 C") + reason = buf.byteslice(5, reason_len).force_encoding("utf-8") + @blocked = reason + @write_lock.lock + when 61 # connection#unblocked + @blocked = nil + @write_lock.unlock else raise Error::UnsupportedMethodFrame, class_id, method_id end when 20 # channel case method_id when 11 # channel#open-ok @@ -215,10 +232,10 @@ when 40 # channel#close reply_code, reply_text_len = buf.unpack("@4 S> C") reply_text = buf.byteslice(7, reply_text_len).force_encoding("utf-8") classid, methodid = buf.byteslice(7 + reply_text_len, 4).unpack("S> S>") channel = @channels.delete(channel_id) - channel.closed!(reply_code, reply_text, classid, methodid) + channel.closed!(:channel, reply_code, reply_text, classid, methodid) write_bytes FrameBytes.channel_close_ok(channel_id) when 41 # channel#close-ok channel = @channels.delete(channel_id) channel.reply [:channel_close_ok] else raise Error::UnsupportedMethodFrame, class_id, method_id