lib/amqp/client.rb in amqp-client-1.0.0 vs lib/amqp/client.rb in amqp-client-1.0.1
- old
+ new
@@ -1,30 +1,51 @@
# frozen_string_literal: true
require "set"
require_relative "client/version"
require_relative "client/connection"
+require_relative "client/exchange"
+require_relative "client/queue"
+# AMQP 0-9-1 Protocol, this library only implements the Client
+# @see Client
module AMQP
# AMQP 0-9-1 Client
+ # @see Connection
class Client
+ # Create a new Client object, this won't establish a connection yet, use {#connect} or {#start} for that
+ # @param uri [String] URL on the format amqp://username:password@hostname/vhost,
+ # use amqps:// for encrypted connection
+ # @option options [Boolean] connection_name (PROGRAM_NAME) Set a name for the connection to be able to identify
+ # the client from the broker
+ # @option options [Boolean] verify_peer (true) Verify broker's TLS certificate, set to false for self-signed certs
+ # @option options [Integer] heartbeat (0) Heartbeat timeout, defaults to 0 and relies on TCP keepalive instead
+ # @option options [Integer] frame_max (131_072) Maximum frame size,
+ # the smallest of the client's and the broker's values will be used
+ # @option options [Integer] channel_max (2048) Maxium number of channels the client will be allowed to have open.
+ # Maxium allowed is 65_536. The smallest of the client's and the broker's value will be used.
def initialize(uri, **options)
@uri = uri
@options = options
@queues = {}
@exchanges = {}
@subscriptions = Set.new
@connq = SizedQueue.new(1)
end
- # Opens an AMQP connection, does not try to reconnect
+ # @!group Connect and disconnect
+
+ # Establishes a new AMQP connection, does not try to reconnect
+ # @see Connection.connect
+ # @return [Connection]
def connect(read_loop_thread: true)
Connection.connect(@uri, read_loop_thread: read_loop_thread, **@options)
end
- # Opens an AMQP connection using the high level API, will try to reconnect
+ # Opens an AMQP connection using the high level API, will try to reconnect if successfully connected at first
+ # @return [self]
def start
@stopped = false
Thread.new(connect(read_loop_thread: false)) do |conn|
Thread.abort_on_exception = true # Raising an unhandled exception is a bug
loop do
@@ -40,191 +61,203 @@
ch.basic_consume(queue_name, no_ack: no_ack, worker_threads: wt, arguments: args, &blk)
end
@connq << conn
end
conn.read_loop # blocks until connection is closed, then reconnect
- rescue AMQP::Client::Error => e
+ rescue Error => e
warn "AMQP-Client reconnect error: #{e.inspect}"
sleep @options[:reconnect_interval] || 1
ensure
conn = nil
end
end
self
end
+ # Close the currently open connection
+ # @return [nil]
def stop
+ return if @stopped
+
@stopped = true
conn = @connq.pop
conn.close
nil
end
- def queue(name, durable: true, exclusive: false, auto_delete: false, arguments: {})
+ # @!endgroup
+ # @!group High level objects
+
+ # Declare a queue
+ # @param name [String] Name of the queue
+ # @param durable [Boolean] If true the queue will survive broker restarts,
+ # messages in the queue will only survive if they are published as persistent
+ # @param auto_delete [Boolean] If true the queue will be deleted when the last consumer stops consuming
+ # (it won't be deleted until at least one consumer has consumed from it)
+ # @param arguments [Hash] Custom arguments, such as queue-ttl etc.
+ # @return [Queue]
+ def queue(name, durable: true, auto_delete: false, arguments: {})
raise ArgumentError, "Currently only supports named, durable queues" if name.empty?
@queues.fetch(name) do
with_connection do |conn|
- conn.with_channel do |ch| # use a temp channel in case the declaration fails
- ch.queue_declare(name, durable: durable, exclusive: exclusive, auto_delete: auto_delete, arguments: arguments)
- end
+ conn.channel(1).queue_declare(name, durable: durable, auto_delete: auto_delete, arguments: arguments)
end
@queues[name] = Queue.new(self, name)
end
end
+ # Declare an exchange and return a high level Exchange object
+ # @return [Exchange]
def exchange(name, type, durable: true, auto_delete: false, internal: false, arguments: {})
@exchanges.fetch(name) do
with_connection do |conn|
- conn.with_channel do |ch|
- ch.exchange_declare(name, type, durable: durable, auto_delete: auto_delete, internal: internal, arguments: arguments)
- end
+ conn.channel(1).exchange_declare(name, type, durable: durable, auto_delete: auto_delete,
+ internal: internal, arguments: arguments)
end
@exchanges[name] = Exchange.new(self, name)
end
end
- # High level representation of an exchange
- class Exchange
- def initialize(client, name)
- @client = client
- @name = name
- end
+ # @!endgroup
+ # @!group Publish
- def publish(body, routing_key, arguments: {})
- @client.publish(body, @name, routing_key, arguments: arguments)
- end
-
- # Bind to another exchange
- def bind(exchange, routing_key, arguments: {})
- @client.exchange_bind(@name, exchange, routing_key, arguments: arguments)
- end
-
- # Unbind from another exchange
- def unbind(exchange, routing_key, arguments: {})
- @client.exchange_unbind(@name, exchange, routing_key, arguments: arguments)
- end
-
- def delete
- @client.delete_exchange(@name)
- end
- end
-
- def subscribe(queue_name, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}, &blk)
- @subscriptions.add? [queue_name, no_ack, prefetch, worker_threads, arguments, blk]
-
- with_connection do |conn|
- ch = conn.channel
- ch.basic_qos(prefetch)
- ch.basic_consume(queue_name, no_ack: no_ack, worker_threads: worker_threads, arguments: arguments) do |msg|
- blk.call(msg)
- end
- end
- end
-
# Publish a (persistent) message and wait for confirmation
+ # @return [nil]
def publish(body, exchange, routing_key, **properties)
with_connection do |conn|
properties = { delivery_mode: 2 }.merge!(properties)
conn.channel(1).basic_publish_confirm(body, exchange, routing_key, **properties)
end
end
# Publish a (persistent) message but don't wait for a confirmation
+ # @return [nil]
def publish_and_forget(body, exchange, routing_key, **properties)
with_connection do |conn|
properties = { delivery_mode: 2 }.merge!(properties)
conn.channel(1).basic_publish(body, exchange, routing_key, **properties)
end
end
+ # Wait for unconfirmed publishes
+ # @return [Boolean] True if successful, false if any message negatively acknowledged
def wait_for_confirms
with_connection do |conn|
conn.channel(1).wait_for_confirms
end
end
- def bind(queue, exchange, routing_key, arguments: {})
- with_connection do |conn|
- conn.channel(1).queue_bind(queue, exchange, routing_key, arguments: arguments)
- end
- end
+ # @!endgroup
+ # @!group Queue actions
- def unbind(queue, exchange, routing_key, arguments: {})
+ # Consume messages from a queue
+ # @param queue [String] Name of the queue to subscribe to
+ # @param no_ack [Boolean] When false messages have to be manually acknowledged (or rejected)
+ # @param prefetch [Integer] Specify how many messages to prefetch for consumers with no_ack is false
+ # @param worker_threads [Integer] Number of threads processing messages,
+ # 0 means that the thread calling this method will be blocked
+ # @param arguments [Hash] Custom arguments to the consumer
+ # @yield [Message] Delivered message from the queue
+ # @return [Array<(String, Array<Thread>)>] Returns consumer_tag and an array of worker threads
+ # @return [nil] When `worker_threads` is 0 the method will return when the consumer is cancelled
+ def subscribe(queue, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}, &blk)
+ @subscriptions.add? [queue, no_ack, prefetch, worker_threads, arguments, blk]
+
with_connection do |conn|
- conn.channel(1).queue_unbind(queue, exchange, routing_key, arguments: arguments)
+ ch = conn.channel
+ ch.basic_qos(prefetch)
+ ch.basic_consume(queue, no_ack: no_ack, worker_threads: worker_threads, arguments: arguments) do |msg|
+ blk.call(msg)
+ end
end
end
- def exchange_bind(destination, source, routing_key, arguments: {})
+ # Bind a queue to an exchange
+ # @param queue [String] Name of the queue to bind
+ # @param exchange [String] Name of the exchange to bind to
+ # @param binding_key [String] Binding key on which messages that match might be routed (depending on exchange type)
+ # @param arguments [Hash] Message headers to match on (only relevant for header exchanges)
+ # @return [nil]
+ def bind(queue, exchange, binding_key, arguments: {})
with_connection do |conn|
- conn.channel(1).exchange_bind(destination, source, routing_key, arguments: arguments)
+ conn.channel(1).queue_bind(queue, exchange, binding_key, arguments: arguments)
end
end
- def exchange_unbind(destination, source, routing_key, arguments: {})
+ # Unbind a queue from an exchange
+ # @param queue [String] Name of the queue to unbind
+ # @param exchange [String] Name of the exchange to unbind from
+ # @param binding_key [String] Binding key which the queue is bound to the exchange with
+ # @param arguments [Hash] Arguments matching the binding that's being removed
+ # @return [nil]
+ def unbind(queue, exchange, binding_key, arguments: {})
with_connection do |conn|
- conn.channel(1).exchange_unbind(destination, source, routing_key, arguments: arguments)
+ conn.channel(1).queue_unbind(queue, exchange, binding_key, arguments: arguments)
end
end
+ # Purge a queue
+ # @param queue [String] Name of the queue
+ # @return [nil]
def purge(queue)
with_connection do |conn|
conn.channel(1).queue_purge(queue)
end
end
- def delete_queue(name)
+ # Delete a queue
+ # @param name [String] Name of the queue
+ # @param if_unused [Boolean] Only delete if the queue doesn't have consumers, raises a ChannelClosed error otherwise
+ # @param if_empty [Boolean] Only delete if the queue is empty, raises a ChannelClosed error otherwise
+ # @return [Integer] Number of messages in the queue when deleted
+ def delete_queue(name, if_unused: false, if_empty: false)
with_connection do |conn|
- conn.channel(1).queue_delete(name)
+ msgs = conn.channel(1).queue_delete(name, if_unused: if_unused, if_empty: if_empty)
@queues.delete(name)
+ msgs
end
end
- def delete_exchange(name)
+ # @!endgroup
+ # @!group Exchange actions
+
+ # Bind an exchange to an exchange
+ # @param destination [String] Name of the exchange to bind
+ # @param source [String] Name of the exchange to bind to
+ # @param binding_key [String] Binding key on which messages that match might be routed (depending on exchange type)
+ # @param arguments [Hash] Message headers to match on (only relevant for header exchanges)
+ # @return [nil]
+ def exchange_bind(destination, source, binding_key, arguments: {})
with_connection do |conn|
- conn.channel(1).exchange_delete(name)
- @exchanges.delete(name)
+ conn.channel(1).exchange_bind(destination, source, binding_key, arguments: arguments)
end
end
- # Queue abstraction
- class Queue
- def initialize(client, name)
- @client = client
- @name = name
+ # Unbind an exchange from an exchange
+ # @param destination [String] Name of the exchange to unbind
+ # @param source [String] Name of the exchange to unbind from
+ # @param binding_key [String] Binding key which the exchange is bound to the exchange with
+ # @param arguments [Hash] Arguments matching the binding that's being removed
+ # @return [nil]
+ def exchange_unbind(destination, source, binding_key, arguments: {})
+ with_connection do |conn|
+ conn.channel(1).exchange_unbind(destination, source, binding_key, arguments: arguments)
end
+ end
- def publish(body, **properties)
- @client.publish(body, "", @name, **properties)
- self
- end
-
- def subscribe(no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}, &blk)
- @client.subscribe(@name, no_ack: no_ack, prefetch: prefetch, worker_threads: worker_threads, arguments: arguments, &blk)
- self
- end
-
- def bind(exchange, routing_key, **headers)
- @client.bind(@name, exchange, routing_key, **headers)
- self
- end
-
- def unbind(exchange, routing_key, **headers)
- @client.unbind(@name, exchange, routing_key, **headers)
- self
- end
-
- def purge
- @client.purge(@name)
- self
- end
-
- def delete
- @client.delete_queue(@name)
+ # Delete an exchange
+ # @param name [String] Name of the exchange
+ # @return [nil]
+ def delete_exchange(name)
+ with_connection do |conn|
+ conn.channel(1).exchange_delete(name)
+ @exchanges.delete(name)
nil
end
end
+
+ # @!endgroup
private
def with_connection
conn = nil