lib/amqp/client/connection.rb in amqp-client-0.1.0 vs lib/amqp/client/connection.rb in amqp-client-0.2.0
- old
+ new
@@ -1,71 +1,134 @@
# frozen_string_literal: true
+require "socket"
+require "uri"
+require "openssl"
+require_relative "./frames"
+require_relative "./channel"
+require_relative "./errors"
+
module AMQP
- # AMQP Connection
+ # Represents a single AMQP connection
class Connection
- def initialize(socket, channel_max, frame_max, heartbeat)
+ def self.connect(uri, **options)
+ read_loop_thread = options[:read_loop_thread] || true
+
+ uri = URI.parse(uri)
+ tls = uri.scheme == "amqps"
+ port = port_from_env || uri.port || (@tls ? 5671 : 5672)
+ host = uri.host || "localhost"
+ user = uri.user || "guest"
+ password = uri.password || "guest"
+ vhost = URI.decode_www_form_component(uri.path[1..-1] || "/")
+ options = URI.decode_www_form(uri.query || "").map! { |k, v| [k.to_sym, v] }.to_h.merge(options)
+
+ socket = Socket.tcp host, port, connect_timeout: 20, resolv_timeout: 5
+ enable_tcp_keepalive(socket)
+ if tls
+ context = OpenSSL::SSL::SSLContext.new
+ context.verify_mode = OpenSSL::SSL::VERIFY_PEER unless [false, "false", "none"].include? options[:verify_peer]
+ socket = OpenSSL::SSL::SSLSocket.new(socket, context)
+ socket.sync_close = true # closing the TLS socket also closes the TCP socket
+ end
+ channel_max, frame_max, heartbeat = establish(socket, user, password, vhost, **options)
+ Connection.new(socket, channel_max, frame_max, heartbeat, read_loop_thread: read_loop_thread)
+ end
+
+ def initialize(socket, channel_max, frame_max, heartbeat, read_loop_thread: true)
@socket = socket
@channel_max = channel_max
@frame_max = frame_max
@heartbeat = heartbeat
@channels = {}
@closed = false
- @rpc = Queue.new
- Thread.new { read_loop }
+ @replies = Queue.new
+ Thread.new { read_loop } if read_loop_thread
end
- def channel
- id = 1.upto(@channel_max) { |i| break i unless @channels.key? i }
- ch = Channel.new(self, id)
- @channels[id] = ch
+ attr_reader :frame_max
+
+ def channel(id = nil)
+ if id
+ ch = @channels[id] ||= Channel.new(self, id)
+ else
+ id = 1.upto(@channel_max) { |i| break i unless @channels.key? i }
+ ch = @channels[id] = Channel.new(self, id)
+ end
ch.open
end
+ # Declare a new channel, yield, and then close the channel
+ def with_channel
+ ch = channel
+ begin
+ yield ch
+ ensure
+ ch.close
+ end
+ end
+
def close(reason = "", code = 200)
+ return if @closed
+
write_bytes FrameBytes.connection_close(code, reason)
expect(:close_ok)
@closed = true
end
- def write_bytes(bytes)
- @socket.write bytes
+ def closed?
+ @closed
end
- private
+ def write_bytes(*bytes)
+ @socket.write(*bytes)
+ rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e
+ raise AMQP::Client::Error.new("Could not write to socket", cause: e)
+ end
+ # Reads from the socket, required for any kind of progress. Blocks until the connection is closed
def read_loop
socket = @socket
frame_max = @frame_max
buffer = String.new(capacity: frame_max)
loop do
begin
socket.readpartial(frame_max, buffer)
- rescue IOError, EOFError
+ rescue IOError, OpenSSL::OpenSSLError, SystemCallError
break
end
- buf_pos = 0
- while buf_pos < buffer.bytesize
- type, channel_id, frame_size = buffer.unpack("@#{buf_pos}C S> L>")
- frame_end = buffer.unpack1("@#{buf_pos + 7 + frame_size} C")
- raise AMQP::Client::UnexpectedFrameEnd if frame_end != 206
+ pos = 0
+ while pos < buffer.bytesize
+ buffer += socket.read(pos + 8 - buffer.bytesize) if pos + 8 > buffer.bytesize
+ type, channel_id, frame_size = buffer.byteslice(pos, 7).unpack("C S> L>")
+ if frame_size > frame_max
+ raise AMQP::Client::Error, "Frame size #{frame_size} larger than negotiated max frame size #{frame_max}"
+ end
- buf = buffer.byteslice(buf_pos, frame_size + 8)
- buf_pos += frame_size + 8
+ frame_end_pos = pos + 7 + frame_size
+ buffer += socket.read(frame_end_pos - buffer.bytesize + 1) if frame_end_pos + 1 > buffer.bytesize
+ frame_end = buffer[frame_end_pos].ord
+ raise AMQP::Client::UnexpectedFrameEnd, frame_end if frame_end != 206
+
+ buf = buffer.byteslice(pos, frame_size + 8)
+ pos += frame_size + 8
parse_frame(type, channel_id, frame_size, buf) || return
end
end
ensure
@closed = true
+ @replies.close
begin
@socket.close
rescue IOError
nil
end
end
+ private
+
def parse_frame(type, channel_id, frame_size, buf)
case type
when 1 # method frame
class_id, method_id = buf.unpack("@7 S> S>")
case class_id
@@ -73,69 +136,257 @@
raise AMQP::Client::Error, "Unexpected channel id #{channel_id} for Connection frame" if channel_id != 0
case method_id
when 50 # connection#close
code, text_len = buf.unpack("@11 S> C")
- text, error_class_id, error_method_id = buf.unpack("@14 a#{text_len} S> S>")
+ text = buf.byteslice(14, text_len).force_encoding("utf-8")
+ error_class_id, error_method_id = buf.byteslice(14 + text_len, 4).unpack("S> S>")
warn "Connection closed #{code} #{text} #{error_class_id} #{error_method_id}"
write_bytes FrameBytes.connection_close_ok
return false
when 51 # connection#close-ok
- @rpc.push [:close_ok]
+ @replies.push [:close_ok]
return false
else raise AMQP::Client::UnsupportedMethodFrame, class_id, method_id
end
when 20 # channel
case method_id
when 11 # channel#open-ok
- @channels[channel_id].push [:channel_open_ok]
+ @channels[channel_id].reply [:channel_open_ok]
when 40 # channel#close
+ reply_code, reply_text_len = buf.unpack("@11 S> C")
+ reply_text = buf.byteslice(14, reply_text_len).force_encoding("utf-8")
+ classid, methodid = buf.byteslice(14 + reply_text_len, 4).unpack("S> S>")
channel = @channels.delete(channel_id)
- channel&.closed!
+ channel.closed!(reply_code, reply_text, classid, methodid)
when 41 # channel#close-ok
- @channels[channel_id].push [:channel_close_ok]
+ @channels[channel_id].reply [:channel_close_ok]
else raise AMQP::Client::UnsupportedMethodFrame, class_id, method_id
end
+ when 40 # exchange
+ case method_id
+ when 11 # declare-ok
+ @channels[channel_id].reply [:exchange_declare_ok]
+ when 21 # delete-ok
+ @channels[channel_id].reply [:exchange_delete_ok]
+ when 31 # bind-ok
+ @channels[channel_id].reply [:exchange_bind_ok]
+ when 51 # unbind-ok
+ @channels[channel_id].reply [:exchange_unbind_ok]
+ else raise AMQP::Client::UnsupportedMethodFrame, class_id, method_id
+ end
when 50 # queue
case method_id
- when 11 # queue#declare-ok
+ when 11 # declare-ok
queue_name_len = buf.unpack1("@11 C")
- queue_name, message_count, consumer_count = buf.unpack("@12 a#{queue_name_len} L> L>")
- @channels[channel_id].push [:queue_declare_ok, queue_name, message_count, consumer_count]
- else raise AMQP::Client::UnsupportedMethodFrame, class_id, method_id
+ queue_name = buf.byteslice(12, queue_name_len).force_encoding("utf-8")
+ message_count, consumer_count = buf.byteslice(12 + queue_name_len, 8).unpack1("L> L>")
+ @channels[channel_id].reply [:queue_declare_ok, queue_name, message_count, consumer_count]
+ when 21 # bind-ok
+ @channels[channel_id].reply [:queue_bind_ok]
+ when 31 # purge-ok
+ @channels[channel_id].reply [:queue_purge_ok]
+ when 41 # delete-ok
+ message_count = buf.unpack1("@11 L>")
+ @channels[channel_id].reply [:queue_delete, message_count]
+ when 51 # unbind-ok
+ @channels[channel_id].reply [:queue_unbind_ok]
+ else raise AMQP::Client::UnsupportedMethodFrame.new class_id, method_id
end
when 60 # basic
case method_id
+ when 11 # qos-ok
+ @channels[channel_id].reply [:basic_qos_ok]
+ when 21 # consume-ok
+ tag_len = buf.unpack1("@11 C")
+ tag = buf.byteslice(12, tag_len).force_encoding("utf-8")
+ @channels[channel_id].reply [:basic_consume_ok, tag]
+ when 30 # cancel
+ tag_len = buf.unpack1("@11 C")
+ tag = buf.byteslice(12, tag_len).force_encoding("utf-8")
+ no_wait = buf[12 + tag_len].ord
+ @channels[channel_id].consumers.fetch(tag).close
+ write_bytes FrameBytes.basic_cancel_ok(@id, tag) unless no_wait == 1
+ when 31 # cancel-ok
+ tag_len = buf.unpack1("@11 C")
+ tag = buf.byteslice(12, tag_len).force_encoding("utf-8")
+ @channels[channel_id].reply [:basic_cancel_ok, tag]
+ when 50 # return
+ reply_code, reply_text_len = buf.unpack("@11 S> C")
+ pos = 14
+ reply_text = buf.byteslice(pos, reply_text_len).force_encoding("utf-8")
+ pos += reply_text_len
+ exchange_len = buf[pos].ord
+ pos += 1
+ exchange = buf.byteslice(pos, exchange_len).force_encoding("utf-8")
+ pos += exchange_len
+ routing_key_len = buf[pos].ord
+ pos += 1
+ routing_key = buf.byteslice(pos, routing_key_len).force_encoding("utf-8")
+ @channels[channel_id].message_returned(reply_code, reply_text, exchange, routing_key)
+ when 60 # deliver
+ ctag_len = buf[11].ord
+ consumer_tag = buf.byteslice(12, ctag_len).force_encoding("utf-8")
+ pos = 12 + ctag_len
+ delivery_tag, redelivered, exchange_len = buf.byteslice(pos, 10).unpack("Q> C C")
+ pos += 8 + 1 + 1
+ exchange = buf.byteslice(pos, exchange_len).force_encoding("utf-8")
+ pos += exchange_len
+ rk_len = buf[pos].ord
+ pos += 1
+ routing_key = buf.byteslice(pos, rk_len).force_encoding("utf-8")
+ loop do
+ if (consumer = @channels[channel_id].consumers[consumer_tag])
+ consumer.push [:deliver, delivery_tag, redelivered == 1, exchange, routing_key]
+ break
+ else
+ Thread.pass
+ end
+ end
when 71 # get-ok
- delivery_tag, redelivered, exchange_name_len = buf.unpack("@11 Q> C C")
- exchange_name = buf.byteslice(21, exchange_name_len)
- pos = 21 + exchange_name_len
- routing_key_len = buf.unpack1("@#{pos} C")
+ delivery_tag, redelivered, exchange_len = buf.unpack("@11 Q> C C")
+ pos = 21
+ exchange = buf.byteslice(pos, exchange_len).force_encoding("utf-8")
+ pos += exchange_len
+ routing_key_len = buf[pos].ord
pos += 1
- routing_key = buf.byteslice(pos, routing_key_len)
+ routing_key = buf.byteslice(pos, routing_key_len).force_encoding("utf-8")
pos += routing_key_len
- message_count = buf.unpack1("@#{pos} L>")
- @channels[channel_id].push [:basic_get_ok, delivery_tag, exchange_name, routing_key, message_count, redelivered == 1]
+ message_count = buf.byteslice(pos, 4).unpack1("L>")
+ redelivered = redelivered == 1
+ @channels[channel_id].reply [:basic_get_ok, delivery_tag, exchange, routing_key, message_count, redelivered]
when 72 # get-empty
- @channels[channel_id].push [:basic_get_empty]
- else raise AMQP::Client::UnsupportedMethodFrame, class_id, method_id
+ @channels[channel_id].reply [:basic_get_empty]
+ when 80 # ack
+ delivery_tag, multiple = buf.unpack("@11 Q> C")
+ @channels[channel_id].confirm [:ack, delivery_tag, multiple]
+ when 90 # reject
+ delivery_tag, requeue = buf.unpack("@11 Q> C")
+ @channels[channel_id].confirm [:reject, delivery_tag, requeue == 1]
+ when 111 # recover-ok
+ @channels[channel_id].reply [:basic_recover_ok]
+ when 120 # nack
+ delivery_tag, multiple, requeue = buf.unpack("@11 Q> C C")
+ @channels[channel_id].confirm [:nack, delivery_tag, multiple == 1, requeue == 1]
+ else raise AMQP::Client::UnsupportedMethodFrame.new class_id, method_id
end
- else raise AMQP::Client::UnsupportedMethodFrame, class_id, method_id
+ when 85 # confirm
+ case method_id
+ when 11 # select-ok
+ @channels[channel_id].reply [:confirm_select_ok]
+ else raise AMQP::Client::UnsupportedMethodFrame.new class_id, method_id
+ end
+ when 90 # tx
+ case method_id
+ when 11 # select-ok
+ @channels[channel_id].reply [:tx_select_ok]
+ when 21 # commit-ok
+ @channels[channel_id].reply [:tx_commit_ok]
+ when 31 # rollback-ok
+ @channels[channel_id].reply [:tx_rollback_ok]
+ else raise AMQP::Client::UnsupportedMethodFrame.new class_id, method_id
+ end
+ else raise AMQP::Client::UnsupportedMethodFrame.new class_id, method_id
end
when 2 # header
body_size = buf.unpack1("@11 Q>")
- @channels[channel_id].push [:header, body_size, nil]
+ properties = Properties.decode(buf.byteslice(19, buf.bytesize - 20))
+ @channels[channel_id].reply [:header, body_size, properties]
when 3 # body
body = buf.byteslice(7, frame_size)
- @channels[channel_id].push [:body, body]
+ @channels[channel_id].reply [:body, body]
else raise AMQP::Client::UnsupportedFrameType, type
end
true
end
def expect(expected_frame_type)
- frame_type, args = @rpc.shift
- frame_type == expected_frame_type || raise(UnexpectedFrame, expected_frame_type, frame_type)
+ frame_type, args = @replies.shift
+ frame_type == expected_frame_type || raise(UnexpectedFrame.new(expected_frame_type, frame_type))
args
end
+
+ def self.establish(socket, user, password, vhost, **options)
+ channel_max, frame_max, heartbeat = nil
+ socket.write "AMQP\x00\x00\x09\x01"
+ buf = String.new(capacity: 4096)
+ loop do
+ begin
+ socket.readpartial(4096, buf)
+ rescue IOError, OpenSSL::OpenSSLError, SystemCallError => e
+ raise AMQP::Client::Error, "Could not establish AMQP connection: #{e.message}"
+ end
+
+ type, channel_id, frame_size = buf.unpack("C S> L>")
+ frame_end = buf.unpack1("@#{frame_size + 7} C")
+ raise UnexpectedFrameEndError, frame_end if frame_end != 206
+
+ case type
+ when 1 # method frame
+ class_id, method_id = buf.unpack("@7 S> S>")
+ case class_id
+ when 10 # connection
+ raise AMQP::Client::Error, "Unexpected channel id #{channel_id} for Connection frame" if channel_id != 0
+
+ case method_id
+ when 10 # connection#start
+ properties = CLIENT_PROPERTIES.merge({ connection_name: options[:connection_name] })
+ socket.write FrameBytes.connection_start_ok "\u0000#{user}\u0000#{password}", properties
+ when 30 # connection#tune
+ channel_max, frame_max, heartbeat = buf.unpack("@11 S> L> S>")
+ channel_max = [channel_max, 2048].min
+ frame_max = [frame_max, 131_072].min
+ heartbeat = [heartbeat, 0].min
+ socket.write FrameBytes.connection_tune_ok(channel_max, frame_max, heartbeat)
+ socket.write FrameBytes.connection_open(vhost)
+ when 41 # connection#open-ok
+ return [channel_max, frame_max, heartbeat]
+ when 50 # connection#close
+ code, text_len = buf.unpack("@11 S> C")
+ text, error_class_id, error_method_id = buf.unpack("@14 a#{text_len} S> S>")
+ socket.write FrameBytes.connection_close_ok
+ raise AMQP::Client::Error, "Could not establish AMQP connection: #{code} #{text} #{error_class_id} #{error_method_id}"
+ else raise AMQP::Client::Error, "Unexpected class/method: #{class_id} #{method_id}"
+ end
+ else raise AMQP::Client::Error, "Unexpected class/method: #{class_id} #{method_id}"
+ end
+ else raise AMQP::Client::Error, "Unexpected frame type: #{type}"
+ end
+ end
+ rescue StandardError
+ socket.close rescue nil
+ raise
+ end
+
+ def self.enable_tcp_keepalive(socket)
+ socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
+ socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE, 60)
+ socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL, 10)
+ socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, 3)
+ rescue StandardError => e
+ warn "amqp-client: Could not enable TCP keepalive on socket. #{e.inspect}"
+ end
+
+ def self.port_from_env
+ return unless (port = ENV["AMQP_PORT"])
+
+ port.to_i
+ end
+
+ private_class_method :establish, :enable_tcp_keepalive, :port_from_env
+
+ CLIENT_PROPERTIES = {
+ capabilities: {
+ authentication_failure_close: true,
+ publisher_confirms: true,
+ consumer_cancel_notify: true,
+ exchange_exchange_bindings: true,
+ "basic.nack": true,
+ "connection.blocked": true
+ },
+ product: "amqp-client.rb",
+ platform: RUBY_DESCRIPTION,
+ version: AMQP::Client::VERSION,
+ information: "http://github.com/cloudamqp/amqp-client.rb"
+ }.freeze
end
end