lib/amqp/client.rb in amqp-client-1.1.0 vs lib/amqp/client.rb in amqp-client-1.1.1
- old
+ new
@@ -36,16 +36,22 @@
# @!group Connect and disconnect
# Establishes and returns a new AMQP connection
# @see Connection#initialize
# @return [Connection]
+ # @example
+ # connection = AMQP::Client.new("amqps://server.rmq.cloudamqp.com", connection_name: "My connection").connect
def connect(read_loop_thread: true)
Connection.new(@uri, read_loop_thread: read_loop_thread, **@options)
end
# Opens an AMQP connection using the high level API, will try to reconnect if successfully connected at first
# @return [self]
+ # @example
+ # amqp = AMQP::Client.new("amqps://server.rmq.cloudamqp.com")
+ # amqp.start
+ # amqp.queue("foobar")
def start
@stopped = false
Thread.new(connect(read_loop_thread: false)) do |conn|
Thread.abort_on_exception = true # Raising an unhandled exception is a bug
loop do
@@ -93,10 +99,14 @@
# messages in the queue will only survive if they are published as persistent
# @param auto_delete [Boolean] If true the queue will be deleted when the last consumer stops consuming
# (it won't be deleted until at least one consumer has consumed from it)
# @param arguments [Hash] Custom arguments, such as queue-ttl etc.
# @return [Queue]
+ # @example
+ # amqp = AMQP::Client.new.start
+ # q = amqp.queue("foobar")
+ # q.publish("body")
def queue(name, durable: true, auto_delete: false, arguments: {})
raise ArgumentError, "Currently only supports named, durable queues" if name.empty?
@queues.fetch(name) do
with_connection do |conn|
@@ -106,10 +116,14 @@
end
end
# Declare an exchange and return a high level Exchange object
# @return [Exchange]
+ # @example
+ # amqp = AMQP::Client.new.start
+ # x = amqp.exchange("my.hash.exchange", "x-consistent-hash")
+ # x.publish("body", "routing-key")
def exchange(name, type, durable: true, auto_delete: false, internal: false, arguments: {})
@exchanges.fetch(name) do
with_connection do |conn|
conn.channel(1).exchange_declare(name, type, durable: durable, auto_delete: auto_delete,
internal: internal, arguments: arguments)
@@ -170,12 +184,10 @@
@subscriptions.add? [queue, no_ack, prefetch, worker_threads, arguments, blk]
with_connection do |conn|
ch = conn.channel
ch.basic_qos(prefetch)
- ch.basic_consume(queue, no_ack: no_ack, worker_threads: worker_threads, arguments: arguments) do |msg|
- blk.call(msg)
- end
+ ch.basic_consume(queue, no_ack: no_ack, worker_threads: worker_threads, arguments: arguments, &blk)
end
end
# Bind a queue to an exchange
# @param queue [String] Name of the queue to bind