lib/amqp/client.rb in amqp-client-0.3.0 vs lib/amqp/client.rb in amqp-client-1.0.0
- old
+ new
@@ -10,35 +10,45 @@
def initialize(uri, **options)
@uri = uri
@options = options
@queues = {}
+ @exchanges = {}
@subscriptions = Set.new
@connq = SizedQueue.new(1)
end
+ # Opens an AMQP connection, does not try to reconnect
def connect(read_loop_thread: true)
- Connection.connect(@uri, **@options.merge(read_loop_thread: read_loop_thread))
+ Connection.connect(@uri, read_loop_thread: read_loop_thread, **@options)
end
+ # Opens an AMQP connection using the high level API, will try to reconnect
def start
@stopped = false
- Thread.new do
+ Thread.new(connect(read_loop_thread: false)) do |conn|
+ Thread.abort_on_exception = true # Raising an unhandled exception is a bug
loop do
break if @stopped
- conn = connect(read_loop_thread: false)
+ conn ||= connect(read_loop_thread: false)
Thread.new do
# restore connection in another thread, read_loop have to run
conn.channel(1) # reserve channel 1 for publishes
- @subscriptions.each { |args| subscribe(*args) }
+ @subscriptions.each do |queue_name, no_ack, prefetch, wt, args, blk|
+ ch = conn.channel
+ ch.basic_qos(prefetch)
+ ch.basic_consume(queue_name, no_ack: no_ack, worker_threads: wt, arguments: args, &blk)
+ end
@connq << conn
end
conn.read_loop # blocks until connection is closed, then reconnect
- rescue => e
+ rescue AMQP::Client::Error => e
warn "AMQP-Client reconnect error: #{e.inspect}"
sleep @options[:reconnect_interval] || 1
+ ensure
+ conn = nil
end
end
self
end
@@ -47,68 +57,134 @@
conn = @connq.pop
conn.close
nil
end
- def queue(name, arguments: {})
+ def queue(name, durable: true, exclusive: false, auto_delete: false, arguments: {})
raise ArgumentError, "Currently only supports named, durable queues" if name.empty?
@queues.fetch(name) do
with_connection do |conn|
conn.with_channel do |ch| # use a temp channel in case the declaration fails
- ch.queue_declare(name, arguments: arguments)
+ ch.queue_declare(name, durable: durable, exclusive: exclusive, auto_delete: auto_delete, arguments: arguments)
end
end
@queues[name] = Queue.new(self, name)
end
end
+ def exchange(name, type, durable: true, auto_delete: false, internal: false, arguments: {})
+ @exchanges.fetch(name) do
+ with_connection do |conn|
+ conn.with_channel do |ch|
+ ch.exchange_declare(name, type, durable: durable, auto_delete: auto_delete, internal: internal, arguments: arguments)
+ end
+ end
+ @exchanges[name] = Exchange.new(self, name)
+ end
+ end
+
+ # High level representation of an exchange
+ class Exchange
+ def initialize(client, name)
+ @client = client
+ @name = name
+ end
+
+ def publish(body, routing_key, arguments: {})
+ @client.publish(body, @name, routing_key, arguments: arguments)
+ end
+
+ # Bind to another exchange
+ def bind(exchange, routing_key, arguments: {})
+ @client.exchange_bind(@name, exchange, routing_key, arguments: arguments)
+ end
+
+ # Unbind from another exchange
+ def unbind(exchange, routing_key, arguments: {})
+ @client.exchange_unbind(@name, exchange, routing_key, arguments: arguments)
+ end
+
+ def delete
+ @client.delete_exchange(@name)
+ end
+ end
+
def subscribe(queue_name, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}, &blk)
- @subscriptions.add? [queue_name, no_ack, prefetch, arguments, blk]
+ @subscriptions.add? [queue_name, no_ack, prefetch, worker_threads, arguments, blk]
with_connection do |conn|
ch = conn.channel
ch.basic_qos(prefetch)
ch.basic_consume(queue_name, no_ack: no_ack, worker_threads: worker_threads, arguments: arguments) do |msg|
blk.call(msg)
end
end
end
+ # Publish a (persistent) message and wait for confirmation
def publish(body, exchange, routing_key, **properties)
with_connection do |conn|
- # Use channel 1 for publishes
+ properties = { delivery_mode: 2 }.merge!(properties)
conn.channel(1).basic_publish_confirm(body, exchange, routing_key, **properties)
- rescue
- conn.channel(1) # reopen channel 1 if it raised
- raise
end
- rescue => e
- warn "AMQP-Client error publishing, retrying (#{e.inspect})"
- retry
end
- def bind(queue, exchange, routing_key, **headers)
+ # Publish a (persistent) message but don't wait for a confirmation
+ def publish_and_forget(body, exchange, routing_key, **properties)
with_connection do |conn|
- conn.channel(1).queue_bind(queue, exchange, routing_key, **headers)
+ properties = { delivery_mode: 2 }.merge!(properties)
+ conn.channel(1).basic_publish(body, exchange, routing_key, **properties)
end
end
- def unbind(queue, exchange, routing_key, **headers)
+ def wait_for_confirms
with_connection do |conn|
- conn.channel(1).queue_unbind(queue, exchange, routing_key, **headers)
+ conn.channel(1).wait_for_confirms
end
end
+ def bind(queue, exchange, routing_key, arguments: {})
+ with_connection do |conn|
+ conn.channel(1).queue_bind(queue, exchange, routing_key, arguments: arguments)
+ end
+ end
+
+ def unbind(queue, exchange, routing_key, arguments: {})
+ with_connection do |conn|
+ conn.channel(1).queue_unbind(queue, exchange, routing_key, arguments: arguments)
+ end
+ end
+
+ def exchange_bind(destination, source, routing_key, arguments: {})
+ with_connection do |conn|
+ conn.channel(1).exchange_bind(destination, source, routing_key, arguments: arguments)
+ end
+ end
+
+ def exchange_unbind(destination, source, routing_key, arguments: {})
+ with_connection do |conn|
+ conn.channel(1).exchange_unbind(destination, source, routing_key, arguments: arguments)
+ end
+ end
+
def purge(queue)
with_connection do |conn|
conn.channel(1).queue_purge(queue)
end
end
- def delete_queue(queue)
+ def delete_queue(name)
with_connection do |conn|
- conn.channel(1).queue_delete(queue)
+ conn.channel(1).queue_delete(name)
+ @queues.delete(name)
+ end
+ end
+
+ def delete_exchange(name)
+ with_connection do |conn|
+ conn.channel(1).exchange_delete(name)
+ @exchanges.delete(name)
end
end
# Queue abstraction
class Queue