lib/amqp/client.rb in amqp-client-0.3.0 vs lib/amqp/client.rb in amqp-client-1.0.0

- old
+ new

@@ -10,35 +10,45 @@ def initialize(uri, **options) @uri = uri @options = options @queues = {} + @exchanges = {} @subscriptions = Set.new @connq = SizedQueue.new(1) end + # Opens an AMQP connection, does not try to reconnect def connect(read_loop_thread: true) - Connection.connect(@uri, **@options.merge(read_loop_thread: read_loop_thread)) + Connection.connect(@uri, read_loop_thread: read_loop_thread, **@options) end + # Opens an AMQP connection using the high level API, will try to reconnect def start @stopped = false - Thread.new do + Thread.new(connect(read_loop_thread: false)) do |conn| + Thread.abort_on_exception = true # Raising an unhandled exception is a bug loop do break if @stopped - conn = connect(read_loop_thread: false) + conn ||= connect(read_loop_thread: false) Thread.new do # restore connection in another thread, read_loop have to run conn.channel(1) # reserve channel 1 for publishes - @subscriptions.each { |args| subscribe(*args) } + @subscriptions.each do |queue_name, no_ack, prefetch, wt, args, blk| + ch = conn.channel + ch.basic_qos(prefetch) + ch.basic_consume(queue_name, no_ack: no_ack, worker_threads: wt, arguments: args, &blk) + end @connq << conn end conn.read_loop # blocks until connection is closed, then reconnect - rescue => e + rescue AMQP::Client::Error => e warn "AMQP-Client reconnect error: #{e.inspect}" sleep @options[:reconnect_interval] || 1 + ensure + conn = nil end end self end @@ -47,68 +57,134 @@ conn = @connq.pop conn.close nil end - def queue(name, arguments: {}) + def queue(name, durable: true, exclusive: false, auto_delete: false, arguments: {}) raise ArgumentError, "Currently only supports named, durable queues" if name.empty? @queues.fetch(name) do with_connection do |conn| conn.with_channel do |ch| # use a temp channel in case the declaration fails - ch.queue_declare(name, arguments: arguments) + ch.queue_declare(name, durable: durable, exclusive: exclusive, auto_delete: auto_delete, arguments: arguments) end end @queues[name] = Queue.new(self, name) end end + def exchange(name, type, durable: true, auto_delete: false, internal: false, arguments: {}) + @exchanges.fetch(name) do + with_connection do |conn| + conn.with_channel do |ch| + ch.exchange_declare(name, type, durable: durable, auto_delete: auto_delete, internal: internal, arguments: arguments) + end + end + @exchanges[name] = Exchange.new(self, name) + end + end + + # High level representation of an exchange + class Exchange + def initialize(client, name) + @client = client + @name = name + end + + def publish(body, routing_key, arguments: {}) + @client.publish(body, @name, routing_key, arguments: arguments) + end + + # Bind to another exchange + def bind(exchange, routing_key, arguments: {}) + @client.exchange_bind(@name, exchange, routing_key, arguments: arguments) + end + + # Unbind from another exchange + def unbind(exchange, routing_key, arguments: {}) + @client.exchange_unbind(@name, exchange, routing_key, arguments: arguments) + end + + def delete + @client.delete_exchange(@name) + end + end + def subscribe(queue_name, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}, &blk) - @subscriptions.add? [queue_name, no_ack, prefetch, arguments, blk] + @subscriptions.add? [queue_name, no_ack, prefetch, worker_threads, arguments, blk] with_connection do |conn| ch = conn.channel ch.basic_qos(prefetch) ch.basic_consume(queue_name, no_ack: no_ack, worker_threads: worker_threads, arguments: arguments) do |msg| blk.call(msg) end end end + # Publish a (persistent) message and wait for confirmation def publish(body, exchange, routing_key, **properties) with_connection do |conn| - # Use channel 1 for publishes + properties = { delivery_mode: 2 }.merge!(properties) conn.channel(1).basic_publish_confirm(body, exchange, routing_key, **properties) - rescue - conn.channel(1) # reopen channel 1 if it raised - raise end - rescue => e - warn "AMQP-Client error publishing, retrying (#{e.inspect})" - retry end - def bind(queue, exchange, routing_key, **headers) + # Publish a (persistent) message but don't wait for a confirmation + def publish_and_forget(body, exchange, routing_key, **properties) with_connection do |conn| - conn.channel(1).queue_bind(queue, exchange, routing_key, **headers) + properties = { delivery_mode: 2 }.merge!(properties) + conn.channel(1).basic_publish(body, exchange, routing_key, **properties) end end - def unbind(queue, exchange, routing_key, **headers) + def wait_for_confirms with_connection do |conn| - conn.channel(1).queue_unbind(queue, exchange, routing_key, **headers) + conn.channel(1).wait_for_confirms end end + def bind(queue, exchange, routing_key, arguments: {}) + with_connection do |conn| + conn.channel(1).queue_bind(queue, exchange, routing_key, arguments: arguments) + end + end + + def unbind(queue, exchange, routing_key, arguments: {}) + with_connection do |conn| + conn.channel(1).queue_unbind(queue, exchange, routing_key, arguments: arguments) + end + end + + def exchange_bind(destination, source, routing_key, arguments: {}) + with_connection do |conn| + conn.channel(1).exchange_bind(destination, source, routing_key, arguments: arguments) + end + end + + def exchange_unbind(destination, source, routing_key, arguments: {}) + with_connection do |conn| + conn.channel(1).exchange_unbind(destination, source, routing_key, arguments: arguments) + end + end + def purge(queue) with_connection do |conn| conn.channel(1).queue_purge(queue) end end - def delete_queue(queue) + def delete_queue(name) with_connection do |conn| - conn.channel(1).queue_delete(queue) + conn.channel(1).queue_delete(name) + @queues.delete(name) + end + end + + def delete_exchange(name) + with_connection do |conn| + conn.channel(1).exchange_delete(name) + @exchanges.delete(name) end end # Queue abstraction class Queue