lib/amqp/client/connection.rb in amqp-client-1.0.0 vs lib/amqp/client/connection.rb in amqp-client-1.0.1

- old
+ new

@@ -6,406 +6,451 @@ require_relative "./frames" require_relative "./channel" require_relative "./errors" module AMQP - # Represents a single AMQP connection - class Connection - def self.connect(uri, read_loop_thread: true, **options) - uri = URI.parse(uri) - tls = uri.scheme == "amqps" - port = port_from_env || uri.port || (tls ? 5671 : 5672) - host = uri.host || "localhost" - user = uri.user || "guest" - password = uri.password || "guest" - vhost = URI.decode_www_form_component(uri.path[1..-1] || "/") - options = URI.decode_www_form(uri.query || "").map! { |k, v| [k.to_sym, v] }.to_h.merge(options) + class Client + # Represents a single established AMQP connection + class Connection + # Establish a connection to an AMQP broker + # @param uri [String] URL on the format amqp://username:password@hostname/vhost, use amqps:// for encrypted connection + # @param read_loop_thread [Boolean] Set to false if you manually want to run the {#read_loop} + # @option options [Boolean] connection_name (PROGRAM_NAME) Set a name for the connection to be able to identify + # the client from the broker + # @option options [Boolean] verify_peer (true) Verify broker's TLS certificate, set to false for self-signed certs + # @option options [Integer] heartbeat (0) Heartbeat timeout, defaults to 0 and relies on TCP keepalive instead + # @option options [Integer] frame_max (131_072) Maximum frame size, + # the smallest of the client's and the broker's values will be used + # @option options [Integer] channel_max (2048) Maxium number of channels the client will be allowed to have open. + # Maxium allowed is 65_536. The smallest of the client's and the broker's value will be used. + # @return [Connection] + def self.connect(uri, read_loop_thread: true, **options) + uri = URI.parse(uri) + tls = uri.scheme == "amqps" + port = port_from_env || uri.port || (tls ? 5671 : 5672) + host = uri.host || "localhost" + user = uri.user || "guest" + password = uri.password || "guest" + vhost = URI.decode_www_form_component(uri.path[1..-1] || "/") + options = URI.decode_www_form(uri.query || "").map! { |k, v| [k.to_sym, v] }.to_h.merge(options) - socket = Socket.tcp host, port, connect_timeout: 20, resolv_timeout: 5 - enable_tcp_keepalive(socket) - if tls - cert_store = OpenSSL::X509::Store.new - cert_store.set_default_paths - context = OpenSSL::SSL::SSLContext.new - context.cert_store = cert_store - context.verify_mode = OpenSSL::SSL::VERIFY_PEER unless [false, "false", "none"].include? options[:verify_peer] - socket = OpenSSL::SSL::SSLSocket.new(socket, context) - socket.sync_close = true # closing the TLS socket also closes the TCP socket - socket.hostname = host # SNI host - socket.connect - socket.post_connection_check(host) || raise(AMQP::Client::Error, "TLS certificate hostname doesn't match requested") + socket = Socket.tcp host, port, connect_timeout: 20, resolv_timeout: 5 + enable_tcp_keepalive(socket) + if tls + cert_store = OpenSSL::X509::Store.new + cert_store.set_default_paths + context = OpenSSL::SSL::SSLContext.new + context.cert_store = cert_store + context.verify_mode = OpenSSL::SSL::VERIFY_PEER unless [false, "false", "none"].include? options[:verify_peer] + socket = OpenSSL::SSL::SSLSocket.new(socket, context) + socket.sync_close = true # closing the TLS socket also closes the TCP socket + socket.hostname = host # SNI host + socket.connect + socket.post_connection_check(host) || raise(Error, "TLS certificate hostname doesn't match requested") + end + channel_max, frame_max, heartbeat = establish(socket, user, password, vhost, options) + Connection.new(socket, channel_max, frame_max, heartbeat, read_loop_thread: read_loop_thread) end - channel_max, frame_max, heartbeat = establish(socket, user, password, vhost, **options) - Connection.new(socket, channel_max, frame_max, heartbeat, read_loop_thread: read_loop_thread) - end - def initialize(socket, channel_max, frame_max, heartbeat, read_loop_thread: true) - @socket = socket - @channel_max = channel_max.zero? ? 65_536 : channel_max - @frame_max = frame_max - @heartbeat = heartbeat - @channels = {} - @closed = false - @replies = Queue.new - @write_lock = Mutex.new - Thread.new { read_loop } if read_loop_thread - end + # Requires an already established TCP/TLS socket + # @api private + def initialize(socket, channel_max, frame_max, heartbeat, read_loop_thread: true) + @socket = socket + @channel_max = channel_max.zero? ? 65_536 : channel_max + @frame_max = frame_max + @heartbeat = heartbeat + @channels = {} + @closed = false + @replies = ::Queue.new + @write_lock = Mutex.new + Thread.new { read_loop } if read_loop_thread + end - attr_reader :frame_max + # The max frame size negotiated between the client and the broker + # @return [Integer] + attr_reader :frame_max - def inspect - "#<#{self.class} @closed=#{@closed} channel_count=#{@channels.size}>" - end + # Custom inspect + # @return [String] + # @api private + def inspect + "#<#{self.class} @closed=#{@closed} channel_count=#{@channels.size}>" + end - def channel(id = nil) - raise ArgumentError, "Channel ID cannot be 0" if id&.zero? - raise ArgumentError, "Channel ID higher than connection's channel max #{@channel_max}" if id && id > @channel_max + # Open an AMQP channel + # @param id [Integer, nil] If nil a new channel will be opened, otherwise an already open channel might be reused + # @return [Channel] + def channel(id = nil) + raise ArgumentError, "Channel ID cannot be 0" if id&.zero? + raise ArgumentError, "Channel ID higher than connection's channel max #{@channel_max}" if id && id > @channel_max - if id - ch = @channels[id] ||= Channel.new(self, id) - else - id = nil - 1.upto(@channel_max) do |i| - break id = i unless @channels.key? i - end - raise AMQP::Client::Error, "Max channels reached" if id.nil? + if id + ch = @channels[id] ||= Channel.new(self, id) + else + 1.upto(@channel_max) do |i| + break id = i unless @channels.key? i + end + raise Error, "Max channels reached" if id.nil? - ch = @channels[id] = Channel.new(self, id) + ch = @channels[id] = Channel.new(self, id) + end + ch.open end - ch.open - end - # Declare a new channel, yield, and then close the channel - def with_channel - ch = channel - begin - yield ch - ensure - ch.close + # Declare a new channel, yield, and then close the channel + # @yield [Channel] + # @return [Object] Whatever was returned by the block + def with_channel + ch = channel + begin + yield ch + ensure + ch.close + end end - end - def close(reason = "", code = 200) - return if @closed + # Gracefully close a connection + # @param reason [String] A reason to close the connection can be logged by the broker + # @param code [Integer] + # @return [nil] + def close(reason: "", code: 200) + return if @closed - @closed = true - write_bytes FrameBytes.connection_close(code, reason) - @channels.each_value { |ch| ch.closed!(code, reason, 0, 0) } - expect(:close_ok) - end + @closed = true + write_bytes FrameBytes.connection_close(code, reason) + @channels.each_value { |ch| ch.closed!(code, reason, 0, 0) } + expect(:close_ok) + nil + end - def closed? - @closed - end + # True if the connection is closed + # @return [Boolean] + def closed? + @closed + end - def write_bytes(*bytes) - if @socket.is_a? OpenSSL::SSL::SSLSocket + # Write byte array(s) directly to the socket (thread-safe) + # @param bytes [String] One or more byte arrays + # @return [Integer] number of bytes written + # @api private + def write_bytes(*bytes) @write_lock.synchronize do @socket.write(*bytes) end - else - @socket.write(*bytes) + rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e + raise Error, "Could not write to socket, #{e.message}" end - rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e - raise AMQP::Client::Error, "Could not write to socket, #{e.message}" - end - # Reads from the socket, required for any kind of progress. Blocks until the connection is closed - def read_loop - # read more often than write so that channel errors crop up early - Thread.current.priority += 1 - socket = @socket - frame_max = @frame_max - frame_start = String.new(capacity: 7) - frame_buffer = String.new(capacity: frame_max) - loop do - socket.read(7, frame_start) - type, channel_id, frame_size = frame_start.unpack("C S> L>") - if frame_size > frame_max - raise AMQP::Client::Error, "Frame size #{frame_size} is larger than negotiated max frame size #{frame_max}" - end + # Reads from the socket, required for any kind of progress. + # Blocks until the connection is closed. Normally run as a background thread automatically. + # @return [nil] + def read_loop + # read more often than write so that channel errors crop up early + Thread.current.priority += 1 + socket = @socket + frame_max = @frame_max + frame_start = String.new(capacity: 7) + frame_buffer = String.new(capacity: frame_max) + loop do + socket.read(7, frame_start) + type, channel_id, frame_size = frame_start.unpack("C S> L>") + frame_max >= frame_size || raise(Error, "Frame size #{frame_size} larger than negotiated max frame size #{frame_max}") - # read the frame content - socket.read(frame_size, frame_buffer) + # read the frame content + socket.read(frame_size, frame_buffer) - # make sure that the frame end is correct - frame_end = socket.readchar.ord - raise AMQP::Client::UnexpectedFrameEnd, frame_end if frame_end != 206 + # make sure that the frame end is correct + frame_end = socket.readchar.ord + raise UnexpectedFrameEnd, frame_end if frame_end != 206 - # parse the frame, will return false if a close frame was received - parse_frame(type, channel_id, frame_size, frame_buffer) || return - end - rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e - warn "AMQP-Client read error: #{e.inspect}" - nil # ignore read errors - ensure - @closed = true - @replies.close - begin - @socket.close - rescue IOError, OpenSSL::OpenSSLError, SystemCallError + # parse the frame, will return false if a close frame was received + parse_frame(type, channel_id, frame_buffer) || return + end nil + rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e + warn "AMQP-Client read error: #{e.inspect}" + nil # ignore read errors + ensure + @closed = true + @replies.close + begin + @socket.close + rescue IOError, OpenSSL::OpenSSLError, SystemCallError + nil + end end - end - private + private - def parse_frame(type, channel_id, frame_size, buf) - case type - when 1 # method frame - class_id, method_id = buf.unpack("S> S>") - case class_id - when 10 # connection - raise AMQP::Client::Error, "Unexpected channel id #{channel_id} for Connection frame" if channel_id != 0 + def parse_frame(type, channel_id, buf) + case type + when 1 # method frame + class_id, method_id = buf.unpack("S> S>") + case class_id + when 10 # connection + raise Error, "Unexpected channel id #{channel_id} for Connection frame" if channel_id != 0 - case method_id - when 50 # connection#close - @closed = true - code, text_len = buf.unpack("@4 S> C") - text = buf.byteslice(7, text_len).force_encoding("utf-8") - error_class_id, error_method_id = buf.byteslice(7 + text_len, 4).unpack("S> S>") - @channels.each_value { |ch| ch.closed!(code, text, error_class_id, error_method_id) } - begin - write_bytes FrameBytes.connection_close_ok - rescue AMQP::Client::Error - nil # rabbitmq closes the socket after sending Connection::Close, so ignore write errors + case method_id + when 50 # connection#close + @closed = true + code, text_len = buf.unpack("@4 S> C") + text = buf.byteslice(7, text_len).force_encoding("utf-8") + error_class_id, error_method_id = buf.byteslice(7 + text_len, 4).unpack("S> S>") + @channels.each_value { |ch| ch.closed!(code, text, error_class_id, error_method_id) } + begin + write_bytes FrameBytes.connection_close_ok + rescue Error + nil # rabbitmq closes the socket after sending Connection::Close, so ignore write errors + end + return false + when 51 # connection#close-ok + @closed = true + @replies.push [:close_ok] + return false + else raise Error::UnsupportedMethodFrame, class_id, method_id end - return false - when 51 # connection#close-ok - @closed = true - @replies.push [:close_ok] - return false - else raise AMQP::Client::UnsupportedMethodFrame, class_id, method_id + when 20 # channel + case method_id + when 11 # channel#open-ok + @channels[channel_id].reply [:channel_open_ok] + when 40 # channel#close + reply_code, reply_text_len = buf.unpack("@4 S> C") + reply_text = buf.byteslice(7, reply_text_len).force_encoding("utf-8") + classid, methodid = buf.byteslice(7 + reply_text_len, 4).unpack("S> S>") + channel = @channels.delete(channel_id) + channel.closed!(reply_code, reply_text, classid, methodid) + write_bytes FrameBytes.channel_close_ok(channel_id) + when 41 # channel#close-ok + channel = @channels.delete(channel_id) + channel.reply [:channel_close_ok] + else raise Error::UnsupportedMethodFrame, class_id, method_id + end + when 40 # exchange + case method_id + when 11 # declare-ok + @channels[channel_id].reply [:exchange_declare_ok] + when 21 # delete-ok + @channels[channel_id].reply [:exchange_delete_ok] + when 31 # bind-ok + @channels[channel_id].reply [:exchange_bind_ok] + when 51 # unbind-ok + @channels[channel_id].reply [:exchange_unbind_ok] + else raise Error::UnsupportedMethodFrame, class_id, method_id + end + when 50 # queue + case method_id + when 11 # declare-ok + queue_name_len = buf.unpack1("@4 C") + queue_name = buf.byteslice(5, queue_name_len).force_encoding("utf-8") + message_count, consumer_count = buf.byteslice(5 + queue_name_len, 8).unpack("L> L>") + @channels[channel_id].reply [:queue_declare_ok, queue_name, message_count, consumer_count] + when 21 # bind-ok + @channels[channel_id].reply [:queue_bind_ok] + when 31 # purge-ok + @channels[channel_id].reply [:queue_purge_ok] + when 41 # delete-ok + message_count = buf.unpack1("@4 L>") + @channels[channel_id].reply [:queue_delete, message_count] + when 51 # unbind-ok + @channels[channel_id].reply [:queue_unbind_ok] + else raise Error::UnsupportedMethodFrame.new class_id, method_id + end + when 60 # basic + case method_id + when 11 # qos-ok + @channels[channel_id].reply [:basic_qos_ok] + when 21 # consume-ok + tag_len = buf.unpack1("@4 C") + tag = buf.byteslice(5, tag_len).force_encoding("utf-8") + @channels[channel_id].reply [:basic_consume_ok, tag] + when 30 # cancel + tag_len = buf.unpack1("@4 C") + tag = buf.byteslice(5, tag_len).force_encoding("utf-8") + no_wait = buf[5 + tag_len].ord == 1 + @channels[channel_id].close_consumer(tag) + write_bytes FrameBytes.basic_cancel_ok(@id, tag) unless no_wait + when 31 # cancel-ok + tag_len = buf.unpack1("@4 C") + tag = buf.byteslice(5, tag_len).force_encoding("utf-8") + @channels[channel_id].reply [:basic_cancel_ok, tag] + when 50 # return + reply_code, reply_text_len = buf.unpack("@4 S> C") + pos = 7 + reply_text = buf.byteslice(pos, reply_text_len).force_encoding("utf-8") + pos += reply_text_len + exchange_len = buf[pos].ord + pos += 1 + exchange = buf.byteslice(pos, exchange_len).force_encoding("utf-8") + pos += exchange_len + routing_key_len = buf[pos].ord + pos += 1 + routing_key = buf.byteslice(pos, routing_key_len).force_encoding("utf-8") + @channels[channel_id].message_returned(reply_code, reply_text, exchange, routing_key) + when 60 # deliver + ctag_len = buf[4].ord + consumer_tag = buf.byteslice(5, ctag_len).force_encoding("utf-8") + pos = 5 + ctag_len + delivery_tag, redelivered, exchange_len = buf.byteslice(pos, 10).unpack("Q> C C") + pos += 8 + 1 + 1 + exchange = buf.byteslice(pos, exchange_len).force_encoding("utf-8") + pos += exchange_len + rk_len = buf[pos].ord + pos += 1 + routing_key = buf.byteslice(pos, rk_len).force_encoding("utf-8") + @channels[channel_id].message_delivered(consumer_tag, delivery_tag, redelivered == 1, exchange, routing_key) + when 71 # get-ok + delivery_tag, redelivered, exchange_len = buf.unpack("@4 Q> C C") + pos = 14 + exchange = buf.byteslice(pos, exchange_len).force_encoding("utf-8") + pos += exchange_len + routing_key_len = buf[pos].ord + pos += 1 + routing_key = buf.byteslice(pos, routing_key_len).force_encoding("utf-8") + pos += routing_key_len + _message_count = buf.byteslice(pos, 4).unpack1("L>") + @channels[channel_id].message_delivered(nil, delivery_tag, redelivered == 1, exchange, routing_key) + when 72 # get-empty + @channels[channel_id].basic_get_empty + when 80 # ack + delivery_tag, multiple = buf.unpack("@4 Q> C") + @channels[channel_id].confirm [:ack, delivery_tag, multiple == 1] + when 111 # recover-ok + @channels[channel_id].reply [:basic_recover_ok] + when 120 # nack + delivery_tag, multiple, requeue = buf.unpack("@4 Q> C C") + @channels[channel_id].confirm [:nack, delivery_tag, multiple == 1, requeue == 1] + else raise Error::UnsupportedMethodFrame.new class_id, method_id + end + when 85 # confirm + case method_id + when 11 # select-ok + @channels[channel_id].reply [:confirm_select_ok] + else raise Error::UnsupportedMethodFrame.new class_id, method_id + end + when 90 # tx + case method_id + when 11 # select-ok + @channels[channel_id].reply [:tx_select_ok] + when 21 # commit-ok + @channels[channel_id].reply [:tx_commit_ok] + when 31 # rollback-ok + @channels[channel_id].reply [:tx_rollback_ok] + else raise Error::UnsupportedMethodFrame.new class_id, method_id + end + else raise Error::UnsupportedMethodFrame.new class_id, method_id end - when 20 # channel - case method_id - when 11 # channel#open-ok - @channels[channel_id].reply [:channel_open_ok] - when 40 # channel#close - reply_code, reply_text_len = buf.unpack("@4 S> C") - reply_text = buf.byteslice(7, reply_text_len).force_encoding("utf-8") - classid, methodid = buf.byteslice(7 + reply_text_len, 4).unpack("S> S>") - channel = @channels.delete(channel_id) - channel.closed!(reply_code, reply_text, classid, methodid) - write_bytes FrameBytes.channel_close_ok(channel_id) - when 41 # channel#close-ok - channel = @channels.delete(channel_id) - channel.reply [:channel_close_ok] - else raise AMQP::Client::UnsupportedMethodFrame, class_id, method_id - end - when 40 # exchange - case method_id - when 11 # declare-ok - @channels[channel_id].reply [:exchange_declare_ok] - when 21 # delete-ok - @channels[channel_id].reply [:exchange_delete_ok] - when 31 # bind-ok - @channels[channel_id].reply [:exchange_bind_ok] - when 51 # unbind-ok - @channels[channel_id].reply [:exchange_unbind_ok] - else raise AMQP::Client::UnsupportedMethodFrame, class_id, method_id - end - when 50 # queue - case method_id - when 11 # declare-ok - queue_name_len = buf.unpack1("@4 C") - queue_name = buf.byteslice(5, queue_name_len).force_encoding("utf-8") - message_count, consumer_count = buf.byteslice(5 + queue_name_len, 8).unpack("L> L>") - @channels[channel_id].reply [:queue_declare_ok, queue_name, message_count, consumer_count] - when 21 # bind-ok - @channels[channel_id].reply [:queue_bind_ok] - when 31 # purge-ok - @channels[channel_id].reply [:queue_purge_ok] - when 41 # delete-ok - message_count = buf.unpack1("@4 L>") - @channels[channel_id].reply [:queue_delete, message_count] - when 51 # unbind-ok - @channels[channel_id].reply [:queue_unbind_ok] - else raise AMQP::Client::UnsupportedMethodFrame.new class_id, method_id - end - when 60 # basic - case method_id - when 11 # qos-ok - @channels[channel_id].reply [:basic_qos_ok] - when 21 # consume-ok - tag_len = buf.unpack1("@4 C") - tag = buf.byteslice(5, tag_len).force_encoding("utf-8") - @channels[channel_id].reply [:basic_consume_ok, tag] - when 30 # cancel - tag_len = buf.unpack1("@4 C") - tag = buf.byteslice(5, tag_len).force_encoding("utf-8") - no_wait = buf[5 + tag_len].ord - @channels[channel_id].close_consumer(tag) - write_bytes FrameBytes.basic_cancel_ok(@id, tag) unless no_wait == 1 - when 31 # cancel-ok - tag_len = buf.unpack1("@4 C") - tag = buf.byteslice(5, tag_len).force_encoding("utf-8") - @channels[channel_id].reply [:basic_cancel_ok, tag] - when 50 # return - reply_code, reply_text_len = buf.unpack("@4 S> C") - pos = 7 - reply_text = buf.byteslice(pos, reply_text_len).force_encoding("utf-8") - pos += reply_text_len - exchange_len = buf[pos].ord - pos += 1 - exchange = buf.byteslice(pos, exchange_len).force_encoding("utf-8") - pos += exchange_len - routing_key_len = buf[pos].ord - pos += 1 - routing_key = buf.byteslice(pos, routing_key_len).force_encoding("utf-8") - @channels[channel_id].message_returned(reply_code, reply_text, exchange, routing_key) - when 60 # deliver - ctag_len = buf[4].ord - consumer_tag = buf.byteslice(5, ctag_len).force_encoding("utf-8") - pos = 5 + ctag_len - delivery_tag, redelivered, exchange_len = buf.byteslice(pos, 10).unpack("Q> C C") - pos += 8 + 1 + 1 - exchange = buf.byteslice(pos, exchange_len).force_encoding("utf-8") - pos += exchange_len - rk_len = buf[pos].ord - pos += 1 - routing_key = buf.byteslice(pos, rk_len).force_encoding("utf-8") - @channels[channel_id].message_delivered(consumer_tag, delivery_tag, redelivered == 1, exchange, routing_key) - when 71 # get-ok - delivery_tag, redelivered, exchange_len = buf.unpack("@4 Q> C C") - pos = 14 - exchange = buf.byteslice(pos, exchange_len).force_encoding("utf-8") - pos += exchange_len - routing_key_len = buf[pos].ord - pos += 1 - routing_key = buf.byteslice(pos, routing_key_len).force_encoding("utf-8") - pos += routing_key_len - _message_count = buf.byteslice(pos, 4).unpack1("L>") - @channels[channel_id].message_delivered(nil, delivery_tag, redelivered == 1, exchange, routing_key) - when 72 # get-empty - @channels[channel_id].basic_get_empty - when 80 # ack - delivery_tag, multiple = buf.unpack("@4 Q> C") - @channels[channel_id].confirm [:ack, delivery_tag, multiple == 1] - when 111 # recover-ok - @channels[channel_id].reply [:basic_recover_ok] - when 120 # nack - delivery_tag, multiple, requeue = buf.unpack("@4 Q> C C") - @channels[channel_id].confirm [:nack, delivery_tag, multiple == 1, requeue == 1] - else raise AMQP::Client::UnsupportedMethodFrame.new class_id, method_id - end - when 85 # confirm - case method_id - when 11 # select-ok - @channels[channel_id].reply [:confirm_select_ok] - else raise AMQP::Client::UnsupportedMethodFrame.new class_id, method_id - end - when 90 # tx - case method_id - when 11 # select-ok - @channels[channel_id].reply [:tx_select_ok] - when 21 # commit-ok - @channels[channel_id].reply [:tx_commit_ok] - when 31 # rollback-ok - @channels[channel_id].reply [:tx_rollback_ok] - else raise AMQP::Client::UnsupportedMethodFrame.new class_id, method_id - end - else raise AMQP::Client::UnsupportedMethodFrame.new class_id, method_id + when 2 # header + body_size = buf.unpack1("@4 Q>") + properties = Properties.decode(buf.byteslice(12, buf.bytesize - 12)) + @channels[channel_id].header_delivered body_size, properties + when 3 # body + @channels[channel_id].body_delivered buf + else raise Error::UnsupportedFrameType, type end - when 2 # header - body_size = buf.unpack1("@4 Q>") - properties = Properties.decode(buf.byteslice(12, buf.bytesize - 12)) - @channels[channel_id].header_delivered body_size, properties - when 3 # body - @channels[channel_id].body_delivered buf - else raise AMQP::Client::UnsupportedFrameType, type + true end - true - end - def expect(expected_frame_type) - frame_type, args = @replies.pop - frame_type == expected_frame_type || raise(AMQP::Client::UnexpectedFrame.new(expected_frame_type, frame_type)) - args - end + def expect(expected_frame_type) + frame_type, args = @replies.pop + if frame_type.nil? + return if expected_frame_type == :close_ok - def self.establish(socket, user, password, vhost, **options) - channel_max, frame_max, heartbeat = nil - socket.write "AMQP\x00\x00\x09\x01" - buf = String.new(capacity: 4096) - loop do - begin - socket.readpartial(4096, buf) - rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e - raise AMQP::Client::Error, "Could not establish AMQP connection: #{e.message}" + raise(Error::ConnectionClosed, "while waiting for #{expected_frame_type}") end + frame_type == expected_frame_type || raise(Error::UnexpectedFrame.new(expected_frame_type, frame_type)) + args + end - type, channel_id, frame_size = buf.unpack("C S> L>") - frame_end = buf[frame_size + 7].ord - raise UnexpectedFrameEndError, frame_end if frame_end != 206 + # Negotiate a connection + # @return [Array<Integer, Integer, Integer>] channel_max, frame_max, heartbeat + def self.establish(socket, user, password, vhost, options) + channel_max, frame_max, heartbeat = nil + socket.write "AMQP\x00\x00\x09\x01" + buf = String.new(capacity: 4096) + loop do + begin + socket.readpartial(4096, buf) + rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e + raise Error, "Could not establish AMQP connection: #{e.message}" + end - case type - when 1 # method frame - class_id, method_id = buf.unpack("@7 S> S>") - case class_id - when 10 # connection - raise AMQP::Client::Error, "Unexpected channel id #{channel_id} for Connection frame" if channel_id != 0 + type, channel_id, frame_size = buf.unpack("C S> L>") + frame_end = buf[frame_size + 7].ord + raise UnexpectedFrameEndError, frame_end if frame_end != 206 - case method_id - when 10 # connection#start - conn_name = options[:connection_name] || $PROGRAM_NAME - properties = CLIENT_PROPERTIES.merge({ connection_name: conn_name }) - socket.write FrameBytes.connection_start_ok "\u0000#{user}\u0000#{password}", properties - when 30 # connection#tune - channel_max, frame_max, heartbeat = buf.unpack("@11 S> L> S>") - channel_max = [channel_max, 2048].min - frame_max = [frame_max, 131_072].min - heartbeat = [heartbeat, 0].min - socket.write FrameBytes.connection_tune_ok(channel_max, frame_max, heartbeat) - socket.write FrameBytes.connection_open(vhost) - when 41 # connection#open-ok - return [channel_max, frame_max, heartbeat] - when 50 # connection#close - code, text_len = buf.unpack("@11 S> C") - text, error_class_id, error_method_id = buf.unpack("@14 a#{text_len} S> S>") - socket.write FrameBytes.connection_close_ok - raise AMQP::Client::Error, "Could not establish AMQP connection: #{code} #{text} #{error_class_id} #{error_method_id}" - else raise AMQP::Client::Error, "Unexpected class/method: #{class_id} #{method_id}" + case type + when 1 # method frame + class_id, method_id = buf.unpack("@7 S> S>") + case class_id + when 10 # connection + raise Error, "Unexpected channel id #{channel_id} for Connection frame" if channel_id != 0 + + case method_id + when 10 # connection#start + conn_name = options[:connection_name] || $PROGRAM_NAME + properties = CLIENT_PROPERTIES.merge({ connection_name: conn_name }) + socket.write FrameBytes.connection_start_ok "\u0000#{user}\u0000#{password}", properties + when 30 # connection#tune + channel_max, frame_max, heartbeat = buf.unpack("@11 S> L> S>") + channel_max = 65_536 if channel_max.zero? + channel_max = [channel_max, options.fetch(:channel_max, 2048).to_i].min + frame_max = [frame_max, options.fetch(:frame_max, 131_072).to_i].min + heartbeat = [heartbeat, options.fetch(:heartbeat, 0).to_i].min + socket.write FrameBytes.connection_tune_ok(channel_max, frame_max, heartbeat) + socket.write FrameBytes.connection_open(vhost) + when 41 # connection#open-ok + return [channel_max, frame_max, heartbeat] + when 50 # connection#close + code, text_len = buf.unpack("@11 S> C") + text, error_class_id, error_method_id = buf.unpack("@14 a#{text_len} S> S>") + socket.write FrameBytes.connection_close_ok + raise Error, "Could not establish AMQP connection: #{code} #{text} #{error_class_id} #{error_method_id}" + else raise Error, "Unexpected class/method: #{class_id} #{method_id}" + end + else raise Error, "Unexpected class/method: #{class_id} #{method_id}" end - else raise AMQP::Client::Error, "Unexpected class/method: #{class_id} #{method_id}" + else raise Error, "Unexpected frame type: #{type}" end - else raise AMQP::Client::Error, "Unexpected frame type: #{type}" end + rescue StandardError => e + begin + socket.close + rescue IOError, OpenSSL::OpenSSLError, SystemCallError + nil + end + raise e end - rescue StandardError - socket.close rescue nil - raise - end - def self.enable_tcp_keepalive(socket) - socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true) - socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE, 60) - socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL, 10) - socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, 3) - rescue StandardError => e - warn "AMQP-Client could not enable TCP keepalive on socket. #{e.inspect}" - end + def self.enable_tcp_keepalive(socket) + socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true) + socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE, 60) + socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL, 10) + socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, 3) + rescue StandardError => e + warn "AMQP-Client could not enable TCP keepalive on socket. #{e.inspect}" + end - def self.port_from_env - return unless (port = ENV["AMQP_PORT"]) + def self.port_from_env + return unless (port = ENV["AMQP_PORT"]) - port.to_i - end + port.to_i + end - private_class_method :establish, :enable_tcp_keepalive, :port_from_env + private_class_method :establish, :enable_tcp_keepalive, :port_from_env - CLIENT_PROPERTIES = { - capabilities: { - authentication_failure_close: true, - publisher_confirms: true, - consumer_cancel_notify: true, - exchange_exchange_bindings: true, - "basic.nack": true, - "connection.blocked": true - }, - product: "amqp-client.rb", - platform: RUBY_DESCRIPTION, - version: AMQP::Client::VERSION, - information: "http://github.com/cloudamqp/amqp-client.rb" - }.freeze + CLIENT_PROPERTIES = { + capabilities: { + authentication_failure_close: true, + publisher_confirms: true, + consumer_cancel_notify: true, + exchange_exchange_bindings: true, + "basic.nack": true, + "connection.blocked": true + }, + product: "amqp-client.rb", + platform: RUBY_DESCRIPTION, + version: VERSION, + information: "http://github.com/cloudamqp/amqp-client.rb" + }.freeze + end end end