lib/amqp/client/connection.rb in amqp-client-0.3.0 vs lib/amqp/client/connection.rb in amqp-client-1.0.0

- old
+ new

@@ -8,13 +8,11 @@ require_relative "./errors" module AMQP # Represents a single AMQP connection class Connection - def self.connect(uri, **options) - read_loop_thread = options[:read_loop_thread] || true - + def self.connect(uri, read_loop_thread: true, **options) uri = URI.parse(uri) tls = uri.scheme == "amqps" port = port_from_env || uri.port || (tls ? 5671 : 5672) host = uri.host || "localhost" user = uri.user || "guest" @@ -32,33 +30,47 @@ context.verify_mode = OpenSSL::SSL::VERIFY_PEER unless [false, "false", "none"].include? options[:verify_peer] socket = OpenSSL::SSL::SSLSocket.new(socket, context) socket.sync_close = true # closing the TLS socket also closes the TCP socket socket.hostname = host # SNI host socket.connect + socket.post_connection_check(host) || raise(AMQP::Client::Error, "TLS certificate hostname doesn't match requested") end channel_max, frame_max, heartbeat = establish(socket, user, password, vhost, **options) Connection.new(socket, channel_max, frame_max, heartbeat, read_loop_thread: read_loop_thread) end def initialize(socket, channel_max, frame_max, heartbeat, read_loop_thread: true) @socket = socket - @channel_max = channel_max + @channel_max = channel_max.zero? ? 65_536 : channel_max @frame_max = frame_max @heartbeat = heartbeat @channels = {} @closed = false @replies = Queue.new + @write_lock = Mutex.new Thread.new { read_loop } if read_loop_thread end attr_reader :frame_max + def inspect + "#<#{self.class} @closed=#{@closed} channel_count=#{@channels.size}>" + end + def channel(id = nil) + raise ArgumentError, "Channel ID cannot be 0" if id&.zero? + raise ArgumentError, "Channel ID higher than connection's channel max #{@channel_max}" if id && id > @channel_max + if id ch = @channels[id] ||= Channel.new(self, id) else - id = 1.upto(@channel_max) { |i| break i unless @channels.key? i } + id = nil + 1.upto(@channel_max) do |i| + break id = i unless @channels.key? i + end + raise AMQP::Client::Error, "Max channels reached" if id.nil? + ch = @channels[id] = Channel.new(self, id) end ch.open end @@ -73,98 +85,110 @@ end def close(reason = "", code = 200) return if @closed + @closed = true write_bytes FrameBytes.connection_close(code, reason) + @channels.each_value { |ch| ch.closed!(code, reason, 0, 0) } expect(:close_ok) - @closed = true end def closed? @closed end def write_bytes(*bytes) - @socket.write(*bytes) + if @socket.is_a? OpenSSL::SSL::SSLSocket + @write_lock.synchronize do + @socket.write(*bytes) + end + else + @socket.write(*bytes) + end rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e - raise AMQP::Client::Error.new("Could not write to socket", cause: e) + raise AMQP::Client::Error, "Could not write to socket, #{e.message}" end # Reads from the socket, required for any kind of progress. Blocks until the connection is closed def read_loop + # read more often than write so that channel errors crop up early + Thread.current.priority += 1 socket = @socket frame_max = @frame_max - buffer = String.new(capacity: frame_max) + frame_start = String.new(capacity: 7) + frame_buffer = String.new(capacity: frame_max) loop do - begin - socket.readpartial(frame_max, buffer) - rescue IOError, OpenSSL::OpenSSLError, SystemCallError - break + socket.read(7, frame_start) + type, channel_id, frame_size = frame_start.unpack("C S> L>") + if frame_size > frame_max + raise AMQP::Client::Error, "Frame size #{frame_size} is larger than negotiated max frame size #{frame_max}" end - pos = 0 - while pos < buffer.bytesize - buffer += socket.read(pos + 8 - buffer.bytesize) if pos + 8 > buffer.bytesize - type, channel_id, frame_size = buffer.byteslice(pos, 7).unpack("C S> L>") - if frame_size > frame_max - raise AMQP::Client::Error, "Frame size #{frame_size} larger than negotiated max frame size #{frame_max}" - end + # read the frame content + socket.read(frame_size, frame_buffer) - frame_end_pos = pos + 7 + frame_size - buffer += socket.read(frame_end_pos - buffer.bytesize + 1) if frame_end_pos + 1 > buffer.bytesize - frame_end = buffer[frame_end_pos].ord - raise AMQP::Client::UnexpectedFrameEnd, frame_end if frame_end != 206 + # make sure that the frame end is correct + frame_end = socket.readchar.ord + raise AMQP::Client::UnexpectedFrameEnd, frame_end if frame_end != 206 - buf = buffer.byteslice(pos, frame_size + 8) - pos += frame_size + 8 - parse_frame(type, channel_id, frame_size, buf) || return - end + # parse the frame, will return false if a close frame was received + parse_frame(type, channel_id, frame_size, frame_buffer) || return end + rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e + warn "AMQP-Client read error: #{e.inspect}" + nil # ignore read errors ensure @closed = true @replies.close begin @socket.close - rescue IOError + rescue IOError, OpenSSL::OpenSSLError, SystemCallError nil end end private def parse_frame(type, channel_id, frame_size, buf) case type when 1 # method frame - class_id, method_id = buf.unpack("@7 S> S>") + class_id, method_id = buf.unpack("S> S>") case class_id when 10 # connection raise AMQP::Client::Error, "Unexpected channel id #{channel_id} for Connection frame" if channel_id != 0 case method_id when 50 # connection#close - code, text_len = buf.unpack("@11 S> C") - text = buf.byteslice(14, text_len).force_encoding("utf-8") - error_class_id, error_method_id = buf.byteslice(14 + text_len, 4).unpack("S> S>") - warn "Connection closed #{code} #{text} #{error_class_id} #{error_method_id}" - write_bytes FrameBytes.connection_close_ok + @closed = true + code, text_len = buf.unpack("@4 S> C") + text = buf.byteslice(7, text_len).force_encoding("utf-8") + error_class_id, error_method_id = buf.byteslice(7 + text_len, 4).unpack("S> S>") + @channels.each_value { |ch| ch.closed!(code, text, error_class_id, error_method_id) } + begin + write_bytes FrameBytes.connection_close_ok + rescue AMQP::Client::Error + nil # rabbitmq closes the socket after sending Connection::Close, so ignore write errors + end return false when 51 # connection#close-ok + @closed = true @replies.push [:close_ok] return false else raise AMQP::Client::UnsupportedMethodFrame, class_id, method_id end when 20 # channel case method_id when 11 # channel#open-ok @channels[channel_id].reply [:channel_open_ok] when 40 # channel#close - reply_code, reply_text_len = buf.unpack("@11 S> C") - reply_text = buf.byteslice(14, reply_text_len).force_encoding("utf-8") - classid, methodid = buf.byteslice(14 + reply_text_len, 4).unpack("S> S>") + reply_code, reply_text_len = buf.unpack("@4 S> C") + reply_text = buf.byteslice(7, reply_text_len).force_encoding("utf-8") + classid, methodid = buf.byteslice(7 + reply_text_len, 4).unpack("S> S>") channel = @channels.delete(channel_id) channel.closed!(reply_code, reply_text, classid, methodid) + write_bytes FrameBytes.channel_close_ok(channel_id) when 41 # channel#close-ok channel = @channels.delete(channel_id) channel.reply [:channel_close_ok] else raise AMQP::Client::UnsupportedMethodFrame, class_id, method_id end @@ -181,46 +205,46 @@ else raise AMQP::Client::UnsupportedMethodFrame, class_id, method_id end when 50 # queue case method_id when 11 # declare-ok - queue_name_len = buf.unpack1("@11 C") - queue_name = buf.byteslice(12, queue_name_len).force_encoding("utf-8") - message_count, consumer_count = buf.byteslice(12 + queue_name_len, 8).unpack1("L> L>") + queue_name_len = buf.unpack1("@4 C") + queue_name = buf.byteslice(5, queue_name_len).force_encoding("utf-8") + message_count, consumer_count = buf.byteslice(5 + queue_name_len, 8).unpack("L> L>") @channels[channel_id].reply [:queue_declare_ok, queue_name, message_count, consumer_count] when 21 # bind-ok @channels[channel_id].reply [:queue_bind_ok] when 31 # purge-ok @channels[channel_id].reply [:queue_purge_ok] when 41 # delete-ok - message_count = buf.unpack1("@11 L>") + message_count = buf.unpack1("@4 L>") @channels[channel_id].reply [:queue_delete, message_count] when 51 # unbind-ok @channels[channel_id].reply [:queue_unbind_ok] else raise AMQP::Client::UnsupportedMethodFrame.new class_id, method_id end when 60 # basic case method_id when 11 # qos-ok @channels[channel_id].reply [:basic_qos_ok] when 21 # consume-ok - tag_len = buf.unpack1("@11 C") - tag = buf.byteslice(12, tag_len).force_encoding("utf-8") + tag_len = buf.unpack1("@4 C") + tag = buf.byteslice(5, tag_len).force_encoding("utf-8") @channels[channel_id].reply [:basic_consume_ok, tag] when 30 # cancel - tag_len = buf.unpack1("@11 C") - tag = buf.byteslice(12, tag_len).force_encoding("utf-8") - no_wait = buf[12 + tag_len].ord - @channels[channel_id].consumers.fetch(tag).close + tag_len = buf.unpack1("@4 C") + tag = buf.byteslice(5, tag_len).force_encoding("utf-8") + no_wait = buf[5 + tag_len].ord + @channels[channel_id].close_consumer(tag) write_bytes FrameBytes.basic_cancel_ok(@id, tag) unless no_wait == 1 when 31 # cancel-ok - tag_len = buf.unpack1("@11 C") - tag = buf.byteslice(12, tag_len).force_encoding("utf-8") + tag_len = buf.unpack1("@4 C") + tag = buf.byteslice(5, tag_len).force_encoding("utf-8") @channels[channel_id].reply [:basic_cancel_ok, tag] when 50 # return - reply_code, reply_text_len = buf.unpack("@11 S> C") - pos = 14 + reply_code, reply_text_len = buf.unpack("@4 S> C") + pos = 7 reply_text = buf.byteslice(pos, reply_text_len).force_encoding("utf-8") pos += reply_text_len exchange_len = buf[pos].ord pos += 1 exchange = buf.byteslice(pos, exchange_len).force_encoding("utf-8") @@ -228,49 +252,41 @@ routing_key_len = buf[pos].ord pos += 1 routing_key = buf.byteslice(pos, routing_key_len).force_encoding("utf-8") @channels[channel_id].message_returned(reply_code, reply_text, exchange, routing_key) when 60 # deliver - ctag_len = buf[11].ord - consumer_tag = buf.byteslice(12, ctag_len).force_encoding("utf-8") - pos = 12 + ctag_len + ctag_len = buf[4].ord + consumer_tag = buf.byteslice(5, ctag_len).force_encoding("utf-8") + pos = 5 + ctag_len delivery_tag, redelivered, exchange_len = buf.byteslice(pos, 10).unpack("Q> C C") pos += 8 + 1 + 1 exchange = buf.byteslice(pos, exchange_len).force_encoding("utf-8") pos += exchange_len rk_len = buf[pos].ord pos += 1 routing_key = buf.byteslice(pos, rk_len).force_encoding("utf-8") - loop do - if (consumer = @channels[channel_id].consumers[consumer_tag]) - consumer.push [:deliver, delivery_tag, redelivered == 1, exchange, routing_key] - break - else - Thread.pass - end - end + @channels[channel_id].message_delivered(consumer_tag, delivery_tag, redelivered == 1, exchange, routing_key) when 71 # get-ok - delivery_tag, redelivered, exchange_len = buf.unpack("@11 Q> C C") - pos = 21 + delivery_tag, redelivered, exchange_len = buf.unpack("@4 Q> C C") + pos = 14 exchange = buf.byteslice(pos, exchange_len).force_encoding("utf-8") pos += exchange_len routing_key_len = buf[pos].ord pos += 1 routing_key = buf.byteslice(pos, routing_key_len).force_encoding("utf-8") pos += routing_key_len - message_count = buf.byteslice(pos, 4).unpack1("L>") - redelivered = redelivered == 1 - @channels[channel_id].reply [:basic_get_ok, delivery_tag, exchange, routing_key, message_count, redelivered] + _message_count = buf.byteslice(pos, 4).unpack1("L>") + @channels[channel_id].message_delivered(nil, delivery_tag, redelivered == 1, exchange, routing_key) when 72 # get-empty - @channels[channel_id].reply [:basic_get_empty] + @channels[channel_id].basic_get_empty when 80 # ack - delivery_tag, multiple = buf.unpack("@11 Q> C") + delivery_tag, multiple = buf.unpack("@4 Q> C") @channels[channel_id].confirm [:ack, delivery_tag, multiple == 1] when 111 # recover-ok @channels[channel_id].reply [:basic_recover_ok] when 120 # nack - delivery_tag, multiple, requeue = buf.unpack("@11 Q> C C") + delivery_tag, multiple, requeue = buf.unpack("@4 Q> C C") @channels[channel_id].confirm [:nack, delivery_tag, multiple == 1, requeue == 1] else raise AMQP::Client::UnsupportedMethodFrame.new class_id, method_id end when 85 # confirm case method_id @@ -289,24 +305,23 @@ else raise AMQP::Client::UnsupportedMethodFrame.new class_id, method_id end else raise AMQP::Client::UnsupportedMethodFrame.new class_id, method_id end when 2 # header - body_size = buf.unpack1("@11 Q>") - properties = Properties.decode(buf.byteslice(19, buf.bytesize - 20)) - @channels[channel_id].reply [:header, body_size, properties] + body_size = buf.unpack1("@4 Q>") + properties = Properties.decode(buf.byteslice(12, buf.bytesize - 12)) + @channels[channel_id].header_delivered body_size, properties when 3 # body - body = buf.byteslice(7, frame_size) - @channels[channel_id].reply [:body, body] + @channels[channel_id].body_delivered buf else raise AMQP::Client::UnsupportedFrameType, type end true end def expect(expected_frame_type) - frame_type, args = @replies.shift - frame_type == expected_frame_type || raise(UnexpectedFrame.new(expected_frame_type, frame_type)) + frame_type, args = @replies.pop + frame_type == expected_frame_type || raise(AMQP::Client::UnexpectedFrame.new(expected_frame_type, frame_type)) args end def self.establish(socket, user, password, vhost, **options) channel_max, frame_max, heartbeat = nil @@ -318,11 +333,11 @@ rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e raise AMQP::Client::Error, "Could not establish AMQP connection: #{e.message}" end type, channel_id, frame_size = buf.unpack("C S> L>") - frame_end = buf.unpack1("@#{frame_size + 7} C") + frame_end = buf[frame_size + 7].ord raise UnexpectedFrameEndError, frame_end if frame_end != 206 case type when 1 # method frame class_id, method_id = buf.unpack("@7 S> S>") @@ -365,10 +380,10 @@ socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true) socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE, 60) socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL, 10) socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, 3) rescue StandardError => e - warn "amqp-client: Could not enable TCP keepalive on socket. #{e.inspect}" + warn "AMQP-Client could not enable TCP keepalive on socket. #{e.inspect}" end def self.port_from_env return unless (port = ENV["AMQP_PORT"])