lib/amqp/client.rb in amqp-client-1.0.0 vs lib/amqp/client.rb in amqp-client-1.0.1

- old
+ new

@@ -1,30 +1,51 @@ # frozen_string_literal: true require "set" require_relative "client/version" require_relative "client/connection" +require_relative "client/exchange" +require_relative "client/queue" +# AMQP 0-9-1 Protocol, this library only implements the Client +# @see Client module AMQP # AMQP 0-9-1 Client + # @see Connection class Client + # Create a new Client object, this won't establish a connection yet, use {#connect} or {#start} for that + # @param uri [String] URL on the format amqp://username:password@hostname/vhost, + # use amqps:// for encrypted connection + # @option options [Boolean] connection_name (PROGRAM_NAME) Set a name for the connection to be able to identify + # the client from the broker + # @option options [Boolean] verify_peer (true) Verify broker's TLS certificate, set to false for self-signed certs + # @option options [Integer] heartbeat (0) Heartbeat timeout, defaults to 0 and relies on TCP keepalive instead + # @option options [Integer] frame_max (131_072) Maximum frame size, + # the smallest of the client's and the broker's values will be used + # @option options [Integer] channel_max (2048) Maxium number of channels the client will be allowed to have open. + # Maxium allowed is 65_536. The smallest of the client's and the broker's value will be used. def initialize(uri, **options) @uri = uri @options = options @queues = {} @exchanges = {} @subscriptions = Set.new @connq = SizedQueue.new(1) end - # Opens an AMQP connection, does not try to reconnect + # @!group Connect and disconnect + + # Establishes a new AMQP connection, does not try to reconnect + # @see Connection.connect + # @return [Connection] def connect(read_loop_thread: true) Connection.connect(@uri, read_loop_thread: read_loop_thread, **@options) end - # Opens an AMQP connection using the high level API, will try to reconnect + # Opens an AMQP connection using the high level API, will try to reconnect if successfully connected at first + # @return [self] def start @stopped = false Thread.new(connect(read_loop_thread: false)) do |conn| Thread.abort_on_exception = true # Raising an unhandled exception is a bug loop do @@ -40,191 +61,203 @@ ch.basic_consume(queue_name, no_ack: no_ack, worker_threads: wt, arguments: args, &blk) end @connq << conn end conn.read_loop # blocks until connection is closed, then reconnect - rescue AMQP::Client::Error => e + rescue Error => e warn "AMQP-Client reconnect error: #{e.inspect}" sleep @options[:reconnect_interval] || 1 ensure conn = nil end end self end + # Close the currently open connection + # @return [nil] def stop + return if @stopped + @stopped = true conn = @connq.pop conn.close nil end - def queue(name, durable: true, exclusive: false, auto_delete: false, arguments: {}) + # @!endgroup + # @!group High level objects + + # Declare a queue + # @param name [String] Name of the queue + # @param durable [Boolean] If true the queue will survive broker restarts, + # messages in the queue will only survive if they are published as persistent + # @param auto_delete [Boolean] If true the queue will be deleted when the last consumer stops consuming + # (it won't be deleted until at least one consumer has consumed from it) + # @param arguments [Hash] Custom arguments, such as queue-ttl etc. + # @return [Queue] + def queue(name, durable: true, auto_delete: false, arguments: {}) raise ArgumentError, "Currently only supports named, durable queues" if name.empty? @queues.fetch(name) do with_connection do |conn| - conn.with_channel do |ch| # use a temp channel in case the declaration fails - ch.queue_declare(name, durable: durable, exclusive: exclusive, auto_delete: auto_delete, arguments: arguments) - end + conn.channel(1).queue_declare(name, durable: durable, auto_delete: auto_delete, arguments: arguments) end @queues[name] = Queue.new(self, name) end end + # Declare an exchange and return a high level Exchange object + # @return [Exchange] def exchange(name, type, durable: true, auto_delete: false, internal: false, arguments: {}) @exchanges.fetch(name) do with_connection do |conn| - conn.with_channel do |ch| - ch.exchange_declare(name, type, durable: durable, auto_delete: auto_delete, internal: internal, arguments: arguments) - end + conn.channel(1).exchange_declare(name, type, durable: durable, auto_delete: auto_delete, + internal: internal, arguments: arguments) end @exchanges[name] = Exchange.new(self, name) end end - # High level representation of an exchange - class Exchange - def initialize(client, name) - @client = client - @name = name - end + # @!endgroup + # @!group Publish - def publish(body, routing_key, arguments: {}) - @client.publish(body, @name, routing_key, arguments: arguments) - end - - # Bind to another exchange - def bind(exchange, routing_key, arguments: {}) - @client.exchange_bind(@name, exchange, routing_key, arguments: arguments) - end - - # Unbind from another exchange - def unbind(exchange, routing_key, arguments: {}) - @client.exchange_unbind(@name, exchange, routing_key, arguments: arguments) - end - - def delete - @client.delete_exchange(@name) - end - end - - def subscribe(queue_name, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}, &blk) - @subscriptions.add? [queue_name, no_ack, prefetch, worker_threads, arguments, blk] - - with_connection do |conn| - ch = conn.channel - ch.basic_qos(prefetch) - ch.basic_consume(queue_name, no_ack: no_ack, worker_threads: worker_threads, arguments: arguments) do |msg| - blk.call(msg) - end - end - end - # Publish a (persistent) message and wait for confirmation + # @return [nil] def publish(body, exchange, routing_key, **properties) with_connection do |conn| properties = { delivery_mode: 2 }.merge!(properties) conn.channel(1).basic_publish_confirm(body, exchange, routing_key, **properties) end end # Publish a (persistent) message but don't wait for a confirmation + # @return [nil] def publish_and_forget(body, exchange, routing_key, **properties) with_connection do |conn| properties = { delivery_mode: 2 }.merge!(properties) conn.channel(1).basic_publish(body, exchange, routing_key, **properties) end end + # Wait for unconfirmed publishes + # @return [Boolean] True if successful, false if any message negatively acknowledged def wait_for_confirms with_connection do |conn| conn.channel(1).wait_for_confirms end end - def bind(queue, exchange, routing_key, arguments: {}) - with_connection do |conn| - conn.channel(1).queue_bind(queue, exchange, routing_key, arguments: arguments) - end - end + # @!endgroup + # @!group Queue actions - def unbind(queue, exchange, routing_key, arguments: {}) + # Consume messages from a queue + # @param queue [String] Name of the queue to subscribe to + # @param no_ack [Boolean] When false messages have to be manually acknowledged (or rejected) + # @param prefetch [Integer] Specify how many messages to prefetch for consumers with no_ack is false + # @param worker_threads [Integer] Number of threads processing messages, + # 0 means that the thread calling this method will be blocked + # @param arguments [Hash] Custom arguments to the consumer + # @yield [Message] Delivered message from the queue + # @return [Array<(String, Array<Thread>)>] Returns consumer_tag and an array of worker threads + # @return [nil] When `worker_threads` is 0 the method will return when the consumer is cancelled + def subscribe(queue, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}, &blk) + @subscriptions.add? [queue, no_ack, prefetch, worker_threads, arguments, blk] + with_connection do |conn| - conn.channel(1).queue_unbind(queue, exchange, routing_key, arguments: arguments) + ch = conn.channel + ch.basic_qos(prefetch) + ch.basic_consume(queue, no_ack: no_ack, worker_threads: worker_threads, arguments: arguments) do |msg| + blk.call(msg) + end end end - def exchange_bind(destination, source, routing_key, arguments: {}) + # Bind a queue to an exchange + # @param queue [String] Name of the queue to bind + # @param exchange [String] Name of the exchange to bind to + # @param binding_key [String] Binding key on which messages that match might be routed (depending on exchange type) + # @param arguments [Hash] Message headers to match on (only relevant for header exchanges) + # @return [nil] + def bind(queue, exchange, binding_key, arguments: {}) with_connection do |conn| - conn.channel(1).exchange_bind(destination, source, routing_key, arguments: arguments) + conn.channel(1).queue_bind(queue, exchange, binding_key, arguments: arguments) end end - def exchange_unbind(destination, source, routing_key, arguments: {}) + # Unbind a queue from an exchange + # @param queue [String] Name of the queue to unbind + # @param exchange [String] Name of the exchange to unbind from + # @param binding_key [String] Binding key which the queue is bound to the exchange with + # @param arguments [Hash] Arguments matching the binding that's being removed + # @return [nil] + def unbind(queue, exchange, binding_key, arguments: {}) with_connection do |conn| - conn.channel(1).exchange_unbind(destination, source, routing_key, arguments: arguments) + conn.channel(1).queue_unbind(queue, exchange, binding_key, arguments: arguments) end end + # Purge a queue + # @param queue [String] Name of the queue + # @return [nil] def purge(queue) with_connection do |conn| conn.channel(1).queue_purge(queue) end end - def delete_queue(name) + # Delete a queue + # @param name [String] Name of the queue + # @param if_unused [Boolean] Only delete if the queue doesn't have consumers, raises a ChannelClosed error otherwise + # @param if_empty [Boolean] Only delete if the queue is empty, raises a ChannelClosed error otherwise + # @return [Integer] Number of messages in the queue when deleted + def delete_queue(name, if_unused: false, if_empty: false) with_connection do |conn| - conn.channel(1).queue_delete(name) + msgs = conn.channel(1).queue_delete(name, if_unused: if_unused, if_empty: if_empty) @queues.delete(name) + msgs end end - def delete_exchange(name) + # @!endgroup + # @!group Exchange actions + + # Bind an exchange to an exchange + # @param destination [String] Name of the exchange to bind + # @param source [String] Name of the exchange to bind to + # @param binding_key [String] Binding key on which messages that match might be routed (depending on exchange type) + # @param arguments [Hash] Message headers to match on (only relevant for header exchanges) + # @return [nil] + def exchange_bind(destination, source, binding_key, arguments: {}) with_connection do |conn| - conn.channel(1).exchange_delete(name) - @exchanges.delete(name) + conn.channel(1).exchange_bind(destination, source, binding_key, arguments: arguments) end end - # Queue abstraction - class Queue - def initialize(client, name) - @client = client - @name = name + # Unbind an exchange from an exchange + # @param destination [String] Name of the exchange to unbind + # @param source [String] Name of the exchange to unbind from + # @param binding_key [String] Binding key which the exchange is bound to the exchange with + # @param arguments [Hash] Arguments matching the binding that's being removed + # @return [nil] + def exchange_unbind(destination, source, binding_key, arguments: {}) + with_connection do |conn| + conn.channel(1).exchange_unbind(destination, source, binding_key, arguments: arguments) end + end - def publish(body, **properties) - @client.publish(body, "", @name, **properties) - self - end - - def subscribe(no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}, &blk) - @client.subscribe(@name, no_ack: no_ack, prefetch: prefetch, worker_threads: worker_threads, arguments: arguments, &blk) - self - end - - def bind(exchange, routing_key, **headers) - @client.bind(@name, exchange, routing_key, **headers) - self - end - - def unbind(exchange, routing_key, **headers) - @client.unbind(@name, exchange, routing_key, **headers) - self - end - - def purge - @client.purge(@name) - self - end - - def delete - @client.delete_queue(@name) + # Delete an exchange + # @param name [String] Name of the exchange + # @return [nil] + def delete_exchange(name) + with_connection do |conn| + conn.channel(1).exchange_delete(name) + @exchanges.delete(name) nil end end + + # @!endgroup private def with_connection conn = nil