lib/amqp/client.rb in amqp-client-0.1.0 vs lib/amqp/client.rb in amqp-client-0.2.0
- old
+ new
@@ -1,101 +1,169 @@
# frozen_string_literal: true
-require "socket"
-require "uri"
-require "openssl"
+require "set"
require_relative "client/version"
-require_relative "client/errors"
-require_relative "client/frames"
require_relative "client/connection"
-require_relative "client/channel"
module AMQP
# AMQP 0-9-1 Client
class Client
- def initialize(uri)
- @uri = URI.parse(uri)
- @tls = @uri.scheme == "amqps"
- @port = @uri.port || @tls ? 5671 : 5672
- @host = @uri.host || "localhost"
- @user = @uri.user || "guest"
- @password = @uri.password || "guest"
- @vhost = URI.decode_www_form_component(@uri.path[1..-1] || "/")
- @options = URI.decode_www_form(@uri.query || "").to_h
+ def initialize(uri, **options)
+ @uri = uri
+ @options = options
+
+ @queues = {}
+ @subscriptions = Set.new
+ @connq = SizedQueue.new(1)
end
- def connect
- socket = Socket.tcp @host, @port, connect_timeout: 20, resolv_timeout: 5
- enable_tcp_keepalive(socket)
- if @tls
- context = OpenSSL::SSL::SSLContext.new
- context.verify_mode = OpenSSL::SSL::VERIFY_PEER unless @options["verify_peer"] == "none"
- socket = OpenSSL::SSL::SSLSocket.new(socket, context)
- socket.sync_close = true # closing the TLS socket also closes the TCP socket
- end
- channel_max, frame_max, heartbeat = establish(socket)
- Connection.new(socket, channel_max, frame_max, heartbeat)
+ def connect(read_loop_thread: true)
+ Connection.connect(@uri, **@options.merge(read_loop_thread: read_loop_thread))
end
- private
+ def start
+ @stopped = false
+ Thread.new do
+ loop do
+ break if @stopped
- def establish(socket)
- channel_max, frame_max, heartbeat = nil
- socket.write "AMQP\x00\x00\x09\x01"
- buf = String.new(capacity: 4096)
- loop do
- begin
- socket.readpartial(4096, buf)
- rescue EOFError, IOError, OpenSSL::Error => e
- raise Error, "Could not establish AMQP connection: #{e.message}"
+ conn = connect(read_loop_thread: false)
+ Thread.new do
+ # restore connection in another thread, read_loop have to run
+ conn.channel(1) # reserve channel 1 for publishes
+ @subscriptions.each { |args| subscribe(*args) }
+ @connq << conn
+ end
+ conn.read_loop # blocks until connection is closed, then reconnect
+ rescue => e
+ warn "AMQP-Client reconnect error: #{e.inspect}"
+ sleep @options[:reconnect_interval] || 1
end
+ end
+ self
+ end
- type, channel_id, frame_size = buf.unpack("C S> L>")
- frame_end = buf.unpack1("@#{frame_size + 7} C")
- raise UnexpectedFrameEndError, frame_end if frame_end != 206
+ def stop
+ @stopped = true
+ conn = @connq.pop
+ conn.close
+ nil
+ end
- case type
- when 1 # method frame
- class_id, method_id = buf.unpack("@7 S> S>")
- case class_id
- when 10 # connection
- raise Error, "Unexpected channel id #{channel_id} for Connection frame" if channel_id != 0
+ def queue(name, arguments: {})
+ raise ArgumentError, "Currently only supports named, durable queues" if name.empty?
- case method_id
- when 10 # connection#start
- socket.write FrameBytes.connection_start_ok "\u0000#{@user}\u0000#{@password}"
- when 30 # connection#tune
- channel_max, frame_max, heartbeat = buf.unpack("@11 S> L> S>")
- channel_max = [channel_max, 2048].min
- frame_max = [frame_max, 4096].min
- heartbeat = [heartbeat, 0].min
- socket.write FrameBytes.connection_tune_ok(channel_max, frame_max, heartbeat)
- socket.write FrameBytes.connection_open(@vhost)
- when 41 # connection#open-ok
- return [channel_max, frame_max, heartbeat]
- when 50 # connection#close
- code, text_len = buf.unpack("@11 S> C")
- text, error_class_id, error_method_id = buf.unpack("@14 a#{text_len} S> S>")
- socket.write FrameBytes.connection_close_ok
- raise Error, "Could not establish AMQP connection: #{code} #{text} #{error_class_id} #{error_method_id}"
- else raise Error, "Unexpected class/method: #{class_id} #{method_id}"
- end
- else raise Error, "Unexpected class/method: #{class_id} #{method_id}"
+ @queues.fetch(name) do
+ with_connection do |conn|
+ conn.with_channel do |ch| # use a temp channel in case the declaration fails
+ ch.queue_declare(name, arguments: arguments)
end
- else raise Error, "Unexpected frame type: #{type}"
end
+ @queues[name] = Queue.new(self, name)
end
- rescue StandardError
- socket.close
- raise
end
- def enable_tcp_keepalive(socket)
- socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
- socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPIDLE, 60)
- socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPINTVL, 10)
- socket.setsockopt(Socket::SOL_TCP, Socket::TCP_KEEPCNT, 3)
+ def subscribe(queue_name, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}, &blk)
+ @subscriptions.add? [queue_name, no_ack, prefetch, arguments, blk]
+
+ with_connection do |conn|
+ ch = conn.channel
+ ch.basic_qos(prefetch)
+ ch.basic_consume(queue_name, no_ack: no_ack, worker_threads: worker_threads, arguments: arguments) do |msg|
+ blk.call(msg)
+ end
+ end
+ end
+
+ def publish(body, exchange, routing_key, **properties)
+ with_connection do |conn|
+ # Use channel 1 for publishes
+ conn.channel(1).basic_publish_confirm(body, exchange, routing_key, **properties)
+ rescue
+ conn.channel(1) # reopen channel 1 if it raised
+ raise
+ end
rescue => e
- warn "amqp-client: Could not enable TCP keepalive on socket. #{e.inspect}"
+ warn "AMQP-Client error publishing, retrying (#{e.inspect})"
+ retry
+ end
+
+ def bind(queue, exchange, routing_key, **headers)
+ with_connection do |conn|
+ conn.channel(1).queue_bind(queue, exchange, routing_key, **headers)
+ end
+ end
+
+ def unbind(queue, exchange, routing_key, **headers)
+ with_connection do |conn|
+ conn.channel(1).queue_unbind(queue, exchange, routing_key, **headers)
+ end
+ end
+
+ def purge(queue)
+ with_connection do |conn|
+ conn.channel(1).queue_purge(queue)
+ end
+ end
+
+ def delete_queue(queue)
+ with_connection do |conn|
+ conn.channel(1).queue_delete(queue)
+ end
+ end
+
+ # Queue abstraction
+ class Queue
+ def initialize(client, name)
+ @client = client
+ @name = name
+ end
+
+ def publish(body, **properties)
+ @client.publish(body, "", @name, **properties)
+ self
+ end
+
+ def subscribe(prefetch: 1, arguments: {}, &blk)
+ @client.subscribe(@name, prefetch: prefetch, arguments: arguments, &blk)
+ self
+ end
+
+ def bind(exchange, routing_key, **headers)
+ @client.bind(@name, exchange, routing_key, **headers)
+ self
+ end
+
+ def unbind(exchange, routing_key, **headers)
+ @client.unbind(@name, exchange, routing_key, **headers)
+ self
+ end
+
+ def purge
+ @client.purge(@name)
+ self
+ end
+
+ def delete
+ @client.delete_queue(@name)
+ nil
+ end
+ end
+
+ private
+
+ def with_connection
+ conn = nil
+ loop do
+ conn = @connq.pop
+ next if conn.closed?
+
+ break
+ end
+ begin
+ yield conn
+ ensure
+ @connq << conn unless conn.closed?
+ end
end
end
end