lib/amqp/client/connection.rb in amqp-client-1.0.1 vs lib/amqp/client/connection.rb in amqp-client-1.0.2
- old
+ new
@@ -57,13 +57,14 @@
@socket = socket
@channel_max = channel_max.zero? ? 65_536 : channel_max
@frame_max = frame_max
@heartbeat = heartbeat
@channels = {}
- @closed = false
+ @closed = nil
@replies = ::Queue.new
@write_lock = Mutex.new
+ @blocked = nil
Thread.new { read_loop } if read_loop_thread
end
# The max frame size negotiated between the client and the broker
# @return [Integer]
@@ -113,32 +114,41 @@
# @param code [Integer]
# @return [nil]
def close(reason: "", code: 200)
return if @closed
- @closed = true
- write_bytes FrameBytes.connection_close(code, reason)
- @channels.each_value { |ch| ch.closed!(code, reason, 0, 0) }
- expect(:close_ok)
+ @closed = [code, reason]
+ @channels.each_value { |ch| ch.closed!(:connection, code, reason, 0, 0) }
+ if @blocked
+ @socket.close
+ else
+ write_bytes FrameBytes.connection_close(code, reason)
+ expect(:close_ok)
+ end
nil
end
# True if the connection is closed
# @return [Boolean]
def closed?
- @closed
+ !@closed.nil?
end
# Write byte array(s) directly to the socket (thread-safe)
# @param bytes [String] One or more byte arrays
# @return [Integer] number of bytes written
# @api private
def write_bytes(*bytes)
+ blocked = @blocked
+ warn "AMQP-Client blocked by broker: #{blocked}" if blocked
@write_lock.synchronize do
+ warn "AMQP-Client unblocked by broker" if blocked
@socket.write(*bytes)
end
rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e
+ raise Error::ConnectionClosed.new(*@closed) if @closed
+
raise Error, "Could not write to socket, #{e.message}"
end
# Reads from the socket, required for any kind of progress.
# Blocks until the connection is closed. Normally run as a background thread automatically.
@@ -165,14 +175,14 @@
# parse the frame, will return false if a close frame was received
parse_frame(type, channel_id, frame_buffer) || return
end
nil
rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e
- warn "AMQP-Client read error: #{e.inspect}"
+ @closed ||= [400, "read error: #{e.message}"]
nil # ignore read errors
ensure
- @closed = true
+ @closed ||= [400, "unknown"]
@replies.close
begin
@socket.close
rescue IOError, OpenSSL::OpenSSLError, SystemCallError
nil
@@ -189,25 +199,32 @@
when 10 # connection
raise Error, "Unexpected channel id #{channel_id} for Connection frame" if channel_id != 0
case method_id
when 50 # connection#close
- @closed = true
code, text_len = buf.unpack("@4 S> C")
text = buf.byteslice(7, text_len).force_encoding("utf-8")
error_class_id, error_method_id = buf.byteslice(7 + text_len, 4).unpack("S> S>")
- @channels.each_value { |ch| ch.closed!(code, text, error_class_id, error_method_id) }
+ @closed = [code, text, error_class_id, error_method_id]
+ @channels.each_value { |ch| ch.closed!(:connection, code, text, error_class_id, error_method_id) }
begin
write_bytes FrameBytes.connection_close_ok
rescue Error
nil # rabbitmq closes the socket after sending Connection::Close, so ignore write errors
end
return false
when 51 # connection#close-ok
- @closed = true
@replies.push [:close_ok]
return false
+ when 60 # connection#blocked
+ reason_len = buf.unpack1("@4 C")
+ reason = buf.byteslice(5, reason_len).force_encoding("utf-8")
+ @blocked = reason
+ @write_lock.lock
+ when 61 # connection#unblocked
+ @blocked = nil
+ @write_lock.unlock
else raise Error::UnsupportedMethodFrame, class_id, method_id
end
when 20 # channel
case method_id
when 11 # channel#open-ok
@@ -215,10 +232,10 @@
when 40 # channel#close
reply_code, reply_text_len = buf.unpack("@4 S> C")
reply_text = buf.byteslice(7, reply_text_len).force_encoding("utf-8")
classid, methodid = buf.byteslice(7 + reply_text_len, 4).unpack("S> S>")
channel = @channels.delete(channel_id)
- channel.closed!(reply_code, reply_text, classid, methodid)
+ channel.closed!(:channel, reply_code, reply_text, classid, methodid)
write_bytes FrameBytes.channel_close_ok(channel_id)
when 41 # channel#close-ok
channel = @channels.delete(channel_id)
channel.reply [:channel_close_ok]
else raise Error::UnsupportedMethodFrame, class_id, method_id