lib/amqp/client.rb in amqp-client-0.1.0 vs lib/amqp/client.rb in amqp-client-0.2.0

- old
+ new

@@ -1,101 +1,169 @@ # frozen_string_literal: true -require "socket" -require "uri" -require "openssl" +require "set" require_relative "client/version" -require_relative "client/errors" -require_relative "client/frames" require_relative "client/connection" -require_relative "client/channel" module AMQP # AMQP 0-9-1 Client class Client - def initialize(uri) - @uri = URI.parse(uri) - @tls = @uri.scheme == "amqps" - @port = @uri.port || @tls ? 5671 : 5672 - @host = @uri.host || "localhost" - @user = @uri.user || "guest" - @password = @uri.password || "guest" - @vhost = URI.decode_www_form_component(@uri.path[1..-1] || "/") - @options = URI.decode_www_form(@uri.query || "").to_h + def initialize(uri, **options) + @uri = uri + @options = options + + @queues = {} + @subscriptions = Set.new + @connq = SizedQueue.new(1) end - def connect - socket = Socket.tcp @host, @port, connect_timeout: 20, resolv_timeout: 5 - enable_tcp_keepalive(socket) - if @tls - context = OpenSSL::SSL::SSLContext.new - context.verify_mode = OpenSSL::SSL::VERIFY_PEER unless @options["verify_peer"] == "none" - socket = OpenSSL::SSL::SSLSocket.new(socket, context) - socket.sync_close = true # closing the TLS socket also closes the TCP socket - end - channel_max, frame_max, heartbeat = establish(socket) - Connection.new(socket, channel_max, frame_max, heartbeat) + def connect(read_loop_thread: true) + Connection.connect(@uri, **@options.merge(read_loop_thread: read_loop_thread)) end - private + def start + @stopped = false + Thread.new do + loop do + break if @stopped - def establish(socket) - channel_max, frame_max, heartbeat = nil - socket.write "AMQP\x00\x00\x09\x01" - buf = String.new(capacity: 4096) - loop do - begin - socket.readpartial(4096, buf) - rescue EOFError, IOError, OpenSSL::Error => e - raise Error, "Could not establish AMQP connection: #{e.message}" + conn = connect(read_loop_thread: false) + Thread.new do + # restore connection in another thread, read_loop have to run + conn.channel(1) # reserve channel 1 for publishes + @subscriptions.each { |args| subscribe(*args) } + @connq << conn + end + conn.read_loop # blocks until connection is closed, then reconnect + rescue => e + warn "AMQP-Client reconnect error: #{e.inspect}" + sleep @options[:reconnect_interval] || 1 end + end + self + end - type, channel_id, frame_size = buf.unpack("C S> L>") - frame_end = buf.unpack1("@#{frame_size + 7} C") - raise UnexpectedFrameEndError, frame_end if frame_end != 206 + def stop + @stopped = true + conn = @connq.pop + conn.close + nil + end - case type - when 1 # method frame - class_id, method_id = buf.unpack("@7 S> S>") - case class_id - when 10 # connection - raise Error, "Unexpected channel id #{channel_id} for Connection frame" if channel_id != 0 + def queue(name, arguments: {}) + raise ArgumentError, "Currently only supports named, durable queues" if name.empty? - case method_id - when 10 # connection#start - socket.write FrameBytes.connection_start_ok "\u0000#{@user}\u0000#{@password}" - when 30 # connection#tune - channel_max, frame_max, heartbeat = buf.unpack("@11 S> L> S>") - channel_max = [channel_max, 2048].min - frame_max = [frame_max, 4096].min - heartbeat = [heartbeat, 0].min - socket.write FrameBytes.connection_tune_ok(channel_max, frame_max, heartbeat) - socket.write FrameBytes.connection_open(@vhost) - when 41 # connection#open-ok - return [channel_max, frame_max, heartbeat] - when 50 # connection#close - code, text_len = buf.unpack("@11 S> C") - text, error_class_id, error_method_id = buf.unpack("@14 a#{text_len} S> S>") - socket.write FrameBytes.connection_close_ok - raise Error, "Could not establish AMQP connection: #{code} #{text} #{error_class_id} #{error_method_id}" - else raise Error, "Unexpected class/method: #{class_id} #{method_id}" - end - else raise Error, "Unexpected class/method: #{class_id} #{method_id}" + @queues.fetch(name) do + with_connection do |conn| + conn.with_channel do |ch| # use a temp channel in case the declaration fails + ch.queue_declare(name, arguments: arguments) end - else raise Error, "Unexpected frame type: #{type}" end + @queues[name] = Queue.new(self, name) end - rescue StandardError - socket.close - raise end - def enable_tcp_keepalive(socket) - socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true) - socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE, 60) - socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL, 10) - socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, 3) + def subscribe(queue_name, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}, &blk) + @subscriptions.add? [queue_name, no_ack, prefetch, arguments, blk] + + with_connection do |conn| + ch = conn.channel + ch.basic_qos(prefetch) + ch.basic_consume(queue_name, no_ack: no_ack, worker_threads: worker_threads, arguments: arguments) do |msg| + blk.call(msg) + end + end + end + + def publish(body, exchange, routing_key, **properties) + with_connection do |conn| + # Use channel 1 for publishes + conn.channel(1).basic_publish_confirm(body, exchange, routing_key, **properties) + rescue + conn.channel(1) # reopen channel 1 if it raised + raise + end rescue => e - warn "amqp-client: Could not enable TCP keepalive on socket. #{e.inspect}" + warn "AMQP-Client error publishing, retrying (#{e.inspect})" + retry + end + + def bind(queue, exchange, routing_key, **headers) + with_connection do |conn| + conn.channel(1).queue_bind(queue, exchange, routing_key, **headers) + end + end + + def unbind(queue, exchange, routing_key, **headers) + with_connection do |conn| + conn.channel(1).queue_unbind(queue, exchange, routing_key, **headers) + end + end + + def purge(queue) + with_connection do |conn| + conn.channel(1).queue_purge(queue) + end + end + + def delete_queue(queue) + with_connection do |conn| + conn.channel(1).queue_delete(queue) + end + end + + # Queue abstraction + class Queue + def initialize(client, name) + @client = client + @name = name + end + + def publish(body, **properties) + @client.publish(body, "", @name, **properties) + self + end + + def subscribe(prefetch: 1, arguments: {}, &blk) + @client.subscribe(@name, prefetch: prefetch, arguments: arguments, &blk) + self + end + + def bind(exchange, routing_key, **headers) + @client.bind(@name, exchange, routing_key, **headers) + self + end + + def unbind(exchange, routing_key, **headers) + @client.unbind(@name, exchange, routing_key, **headers) + self + end + + def purge + @client.purge(@name) + self + end + + def delete + @client.delete_queue(@name) + nil + end + end + + private + + def with_connection + conn = nil + loop do + conn = @connq.pop + next if conn.closed? + + break + end + begin + yield conn + ensure + @connq << conn unless conn.closed? + end end end end