lib/amqp/client/connection.rb in amqp-client-0.3.0 vs lib/amqp/client/connection.rb in amqp-client-1.0.0
- old
+ new
@@ -8,13 +8,11 @@
require_relative "./errors"
module AMQP
# Represents a single AMQP connection
class Connection
- def self.connect(uri, **options)
- read_loop_thread = options[:read_loop_thread] || true
-
+ def self.connect(uri, read_loop_thread: true, **options)
uri = URI.parse(uri)
tls = uri.scheme == "amqps"
port = port_from_env || uri.port || (tls ? 5671 : 5672)
host = uri.host || "localhost"
user = uri.user || "guest"
@@ -32,33 +30,47 @@
context.verify_mode = OpenSSL::SSL::VERIFY_PEER unless [false, "false", "none"].include? options[:verify_peer]
socket = OpenSSL::SSL::SSLSocket.new(socket, context)
socket.sync_close = true # closing the TLS socket also closes the TCP socket
socket.hostname = host # SNI host
socket.connect
+ socket.post_connection_check(host) || raise(AMQP::Client::Error, "TLS certificate hostname doesn't match requested")
end
channel_max, frame_max, heartbeat = establish(socket, user, password, vhost, **options)
Connection.new(socket, channel_max, frame_max, heartbeat, read_loop_thread: read_loop_thread)
end
def initialize(socket, channel_max, frame_max, heartbeat, read_loop_thread: true)
@socket = socket
- @channel_max = channel_max
+ @channel_max = channel_max.zero? ? 65_536 : channel_max
@frame_max = frame_max
@heartbeat = heartbeat
@channels = {}
@closed = false
@replies = Queue.new
+ @write_lock = Mutex.new
Thread.new { read_loop } if read_loop_thread
end
attr_reader :frame_max
+ def inspect
+ "#<#{self.class} @closed=#{@closed} channel_count=#{@channels.size}>"
+ end
+
def channel(id = nil)
+ raise ArgumentError, "Channel ID cannot be 0" if id&.zero?
+ raise ArgumentError, "Channel ID higher than connection's channel max #{@channel_max}" if id && id > @channel_max
+
if id
ch = @channels[id] ||= Channel.new(self, id)
else
- id = 1.upto(@channel_max) { |i| break i unless @channels.key? i }
+ id = nil
+ 1.upto(@channel_max) do |i|
+ break id = i unless @channels.key? i
+ end
+ raise AMQP::Client::Error, "Max channels reached" if id.nil?
+
ch = @channels[id] = Channel.new(self, id)
end
ch.open
end
@@ -73,98 +85,110 @@
end
def close(reason = "", code = 200)
return if @closed
+ @closed = true
write_bytes FrameBytes.connection_close(code, reason)
+ @channels.each_value { |ch| ch.closed!(code, reason, 0, 0) }
expect(:close_ok)
- @closed = true
end
def closed?
@closed
end
def write_bytes(*bytes)
- @socket.write(*bytes)
+ if @socket.is_a? OpenSSL::SSL::SSLSocket
+ @write_lock.synchronize do
+ @socket.write(*bytes)
+ end
+ else
+ @socket.write(*bytes)
+ end
rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e
- raise AMQP::Client::Error.new("Could not write to socket", cause: e)
+ raise AMQP::Client::Error, "Could not write to socket, #{e.message}"
end
# Reads from the socket, required for any kind of progress. Blocks until the connection is closed
def read_loop
+ # read more often than write so that channel errors crop up early
+ Thread.current.priority += 1
socket = @socket
frame_max = @frame_max
- buffer = String.new(capacity: frame_max)
+ frame_start = String.new(capacity: 7)
+ frame_buffer = String.new(capacity: frame_max)
loop do
- begin
- socket.readpartial(frame_max, buffer)
- rescue IOError, OpenSSL::OpenSSLError, SystemCallError
- break
+ socket.read(7, frame_start)
+ type, channel_id, frame_size = frame_start.unpack("C S> L>")
+ if frame_size > frame_max
+ raise AMQP::Client::Error, "Frame size #{frame_size} is larger than negotiated max frame size #{frame_max}"
end
- pos = 0
- while pos < buffer.bytesize
- buffer += socket.read(pos + 8 - buffer.bytesize) if pos + 8 > buffer.bytesize
- type, channel_id, frame_size = buffer.byteslice(pos, 7).unpack("C S> L>")
- if frame_size > frame_max
- raise AMQP::Client::Error, "Frame size #{frame_size} larger than negotiated max frame size #{frame_max}"
- end
+ # read the frame content
+ socket.read(frame_size, frame_buffer)
- frame_end_pos = pos + 7 + frame_size
- buffer += socket.read(frame_end_pos - buffer.bytesize + 1) if frame_end_pos + 1 > buffer.bytesize
- frame_end = buffer[frame_end_pos].ord
- raise AMQP::Client::UnexpectedFrameEnd, frame_end if frame_end != 206
+ # make sure that the frame end is correct
+ frame_end = socket.readchar.ord
+ raise AMQP::Client::UnexpectedFrameEnd, frame_end if frame_end != 206
- buf = buffer.byteslice(pos, frame_size + 8)
- pos += frame_size + 8
- parse_frame(type, channel_id, frame_size, buf) || return
- end
+ # parse the frame, will return false if a close frame was received
+ parse_frame(type, channel_id, frame_size, frame_buffer) || return
end
+ rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e
+ warn "AMQP-Client read error: #{e.inspect}"
+ nil # ignore read errors
ensure
@closed = true
@replies.close
begin
@socket.close
- rescue IOError
+ rescue IOError, OpenSSL::OpenSSLError, SystemCallError
nil
end
end
private
def parse_frame(type, channel_id, frame_size, buf)
case type
when 1 # method frame
- class_id, method_id = buf.unpack("@7 S> S>")
+ class_id, method_id = buf.unpack("S> S>")
case class_id
when 10 # connection
raise AMQP::Client::Error, "Unexpected channel id #{channel_id} for Connection frame" if channel_id != 0
case method_id
when 50 # connection#close
- code, text_len = buf.unpack("@11 S> C")
- text = buf.byteslice(14, text_len).force_encoding("utf-8")
- error_class_id, error_method_id = buf.byteslice(14 + text_len, 4).unpack("S> S>")
- warn "Connection closed #{code} #{text} #{error_class_id} #{error_method_id}"
- write_bytes FrameBytes.connection_close_ok
+ @closed = true
+ code, text_len = buf.unpack("@4 S> C")
+ text = buf.byteslice(7, text_len).force_encoding("utf-8")
+ error_class_id, error_method_id = buf.byteslice(7 + text_len, 4).unpack("S> S>")
+ @channels.each_value { |ch| ch.closed!(code, text, error_class_id, error_method_id) }
+ begin
+ write_bytes FrameBytes.connection_close_ok
+ rescue AMQP::Client::Error
+ nil # rabbitmq closes the socket after sending Connection::Close, so ignore write errors
+ end
return false
when 51 # connection#close-ok
+ @closed = true
@replies.push [:close_ok]
return false
else raise AMQP::Client::UnsupportedMethodFrame, class_id, method_id
end
when 20 # channel
case method_id
when 11 # channel#open-ok
@channels[channel_id].reply [:channel_open_ok]
when 40 # channel#close
- reply_code, reply_text_len = buf.unpack("@11 S> C")
- reply_text = buf.byteslice(14, reply_text_len).force_encoding("utf-8")
- classid, methodid = buf.byteslice(14 + reply_text_len, 4).unpack("S> S>")
+ reply_code, reply_text_len = buf.unpack("@4 S> C")
+ reply_text = buf.byteslice(7, reply_text_len).force_encoding("utf-8")
+ classid, methodid = buf.byteslice(7 + reply_text_len, 4).unpack("S> S>")
channel = @channels.delete(channel_id)
channel.closed!(reply_code, reply_text, classid, methodid)
+ write_bytes FrameBytes.channel_close_ok(channel_id)
when 41 # channel#close-ok
channel = @channels.delete(channel_id)
channel.reply [:channel_close_ok]
else raise AMQP::Client::UnsupportedMethodFrame, class_id, method_id
end
@@ -181,46 +205,46 @@
else raise AMQP::Client::UnsupportedMethodFrame, class_id, method_id
end
when 50 # queue
case method_id
when 11 # declare-ok
- queue_name_len = buf.unpack1("@11 C")
- queue_name = buf.byteslice(12, queue_name_len).force_encoding("utf-8")
- message_count, consumer_count = buf.byteslice(12 + queue_name_len, 8).unpack1("L> L>")
+ queue_name_len = buf.unpack1("@4 C")
+ queue_name = buf.byteslice(5, queue_name_len).force_encoding("utf-8")
+ message_count, consumer_count = buf.byteslice(5 + queue_name_len, 8).unpack("L> L>")
@channels[channel_id].reply [:queue_declare_ok, queue_name, message_count, consumer_count]
when 21 # bind-ok
@channels[channel_id].reply [:queue_bind_ok]
when 31 # purge-ok
@channels[channel_id].reply [:queue_purge_ok]
when 41 # delete-ok
- message_count = buf.unpack1("@11 L>")
+ message_count = buf.unpack1("@4 L>")
@channels[channel_id].reply [:queue_delete, message_count]
when 51 # unbind-ok
@channels[channel_id].reply [:queue_unbind_ok]
else raise AMQP::Client::UnsupportedMethodFrame.new class_id, method_id
end
when 60 # basic
case method_id
when 11 # qos-ok
@channels[channel_id].reply [:basic_qos_ok]
when 21 # consume-ok
- tag_len = buf.unpack1("@11 C")
- tag = buf.byteslice(12, tag_len).force_encoding("utf-8")
+ tag_len = buf.unpack1("@4 C")
+ tag = buf.byteslice(5, tag_len).force_encoding("utf-8")
@channels[channel_id].reply [:basic_consume_ok, tag]
when 30 # cancel
- tag_len = buf.unpack1("@11 C")
- tag = buf.byteslice(12, tag_len).force_encoding("utf-8")
- no_wait = buf[12 + tag_len].ord
- @channels[channel_id].consumers.fetch(tag).close
+ tag_len = buf.unpack1("@4 C")
+ tag = buf.byteslice(5, tag_len).force_encoding("utf-8")
+ no_wait = buf[5 + tag_len].ord
+ @channels[channel_id].close_consumer(tag)
write_bytes FrameBytes.basic_cancel_ok(@id, tag) unless no_wait == 1
when 31 # cancel-ok
- tag_len = buf.unpack1("@11 C")
- tag = buf.byteslice(12, tag_len).force_encoding("utf-8")
+ tag_len = buf.unpack1("@4 C")
+ tag = buf.byteslice(5, tag_len).force_encoding("utf-8")
@channels[channel_id].reply [:basic_cancel_ok, tag]
when 50 # return
- reply_code, reply_text_len = buf.unpack("@11 S> C")
- pos = 14
+ reply_code, reply_text_len = buf.unpack("@4 S> C")
+ pos = 7
reply_text = buf.byteslice(pos, reply_text_len).force_encoding("utf-8")
pos += reply_text_len
exchange_len = buf[pos].ord
pos += 1
exchange = buf.byteslice(pos, exchange_len).force_encoding("utf-8")
@@ -228,49 +252,41 @@
routing_key_len = buf[pos].ord
pos += 1
routing_key = buf.byteslice(pos, routing_key_len).force_encoding("utf-8")
@channels[channel_id].message_returned(reply_code, reply_text, exchange, routing_key)
when 60 # deliver
- ctag_len = buf[11].ord
- consumer_tag = buf.byteslice(12, ctag_len).force_encoding("utf-8")
- pos = 12 + ctag_len
+ ctag_len = buf[4].ord
+ consumer_tag = buf.byteslice(5, ctag_len).force_encoding("utf-8")
+ pos = 5 + ctag_len
delivery_tag, redelivered, exchange_len = buf.byteslice(pos, 10).unpack("Q> C C")
pos += 8 + 1 + 1
exchange = buf.byteslice(pos, exchange_len).force_encoding("utf-8")
pos += exchange_len
rk_len = buf[pos].ord
pos += 1
routing_key = buf.byteslice(pos, rk_len).force_encoding("utf-8")
- loop do
- if (consumer = @channels[channel_id].consumers[consumer_tag])
- consumer.push [:deliver, delivery_tag, redelivered == 1, exchange, routing_key]
- break
- else
- Thread.pass
- end
- end
+ @channels[channel_id].message_delivered(consumer_tag, delivery_tag, redelivered == 1, exchange, routing_key)
when 71 # get-ok
- delivery_tag, redelivered, exchange_len = buf.unpack("@11 Q> C C")
- pos = 21
+ delivery_tag, redelivered, exchange_len = buf.unpack("@4 Q> C C")
+ pos = 14
exchange = buf.byteslice(pos, exchange_len).force_encoding("utf-8")
pos += exchange_len
routing_key_len = buf[pos].ord
pos += 1
routing_key = buf.byteslice(pos, routing_key_len).force_encoding("utf-8")
pos += routing_key_len
- message_count = buf.byteslice(pos, 4).unpack1("L>")
- redelivered = redelivered == 1
- @channels[channel_id].reply [:basic_get_ok, delivery_tag, exchange, routing_key, message_count, redelivered]
+ _message_count = buf.byteslice(pos, 4).unpack1("L>")
+ @channels[channel_id].message_delivered(nil, delivery_tag, redelivered == 1, exchange, routing_key)
when 72 # get-empty
- @channels[channel_id].reply [:basic_get_empty]
+ @channels[channel_id].basic_get_empty
when 80 # ack
- delivery_tag, multiple = buf.unpack("@11 Q> C")
+ delivery_tag, multiple = buf.unpack("@4 Q> C")
@channels[channel_id].confirm [:ack, delivery_tag, multiple == 1]
when 111 # recover-ok
@channels[channel_id].reply [:basic_recover_ok]
when 120 # nack
- delivery_tag, multiple, requeue = buf.unpack("@11 Q> C C")
+ delivery_tag, multiple, requeue = buf.unpack("@4 Q> C C")
@channels[channel_id].confirm [:nack, delivery_tag, multiple == 1, requeue == 1]
else raise AMQP::Client::UnsupportedMethodFrame.new class_id, method_id
end
when 85 # confirm
case method_id
@@ -289,24 +305,23 @@
else raise AMQP::Client::UnsupportedMethodFrame.new class_id, method_id
end
else raise AMQP::Client::UnsupportedMethodFrame.new class_id, method_id
end
when 2 # header
- body_size = buf.unpack1("@11 Q>")
- properties = Properties.decode(buf.byteslice(19, buf.bytesize - 20))
- @channels[channel_id].reply [:header, body_size, properties]
+ body_size = buf.unpack1("@4 Q>")
+ properties = Properties.decode(buf.byteslice(12, buf.bytesize - 12))
+ @channels[channel_id].header_delivered body_size, properties
when 3 # body
- body = buf.byteslice(7, frame_size)
- @channels[channel_id].reply [:body, body]
+ @channels[channel_id].body_delivered buf
else raise AMQP::Client::UnsupportedFrameType, type
end
true
end
def expect(expected_frame_type)
- frame_type, args = @replies.shift
- frame_type == expected_frame_type || raise(UnexpectedFrame.new(expected_frame_type, frame_type))
+ frame_type, args = @replies.pop
+ frame_type == expected_frame_type || raise(AMQP::Client::UnexpectedFrame.new(expected_frame_type, frame_type))
args
end
def self.establish(socket, user, password, vhost, **options)
channel_max, frame_max, heartbeat = nil
@@ -318,11 +333,11 @@
rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e
raise AMQP::Client::Error, "Could not establish AMQP connection: #{e.message}"
end
type, channel_id, frame_size = buf.unpack("C S> L>")
- frame_end = buf.unpack1("@#{frame_size + 7} C")
+ frame_end = buf[frame_size + 7].ord
raise UnexpectedFrameEndError, frame_end if frame_end != 206
case type
when 1 # method frame
class_id, method_id = buf.unpack("@7 S> S>")
@@ -365,10 +380,10 @@
socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE, 60)
socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL, 10)
socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, 3)
rescue StandardError => e
- warn "amqp-client: Could not enable TCP keepalive on socket. #{e.inspect}"
+ warn "AMQP-Client could not enable TCP keepalive on socket. #{e.inspect}"
end
def self.port_from_env
return unless (port = ENV["AMQP_PORT"])