lib/amqp/client.rb in amqp-client-1.1.0 vs lib/amqp/client.rb in amqp-client-1.1.1

- old
+ new

@@ -36,16 +36,22 @@ # @!group Connect and disconnect # Establishes and returns a new AMQP connection # @see Connection#initialize # @return [Connection] + # @example + # connection = AMQP::Client.new("amqps://server.rmq.cloudamqp.com", connection_name: "My connection").connect def connect(read_loop_thread: true) Connection.new(@uri, read_loop_thread: read_loop_thread, **@options) end # Opens an AMQP connection using the high level API, will try to reconnect if successfully connected at first # @return [self] + # @example + # amqp = AMQP::Client.new("amqps://server.rmq.cloudamqp.com") + # amqp.start + # amqp.queue("foobar") def start @stopped = false Thread.new(connect(read_loop_thread: false)) do |conn| Thread.abort_on_exception = true # Raising an unhandled exception is a bug loop do @@ -93,10 +99,14 @@ # messages in the queue will only survive if they are published as persistent # @param auto_delete [Boolean] If true the queue will be deleted when the last consumer stops consuming # (it won't be deleted until at least one consumer has consumed from it) # @param arguments [Hash] Custom arguments, such as queue-ttl etc. # @return [Queue] + # @example + # amqp = AMQP::Client.new.start + # q = amqp.queue("foobar") + # q.publish("body") def queue(name, durable: true, auto_delete: false, arguments: {}) raise ArgumentError, "Currently only supports named, durable queues" if name.empty? @queues.fetch(name) do with_connection do |conn| @@ -106,10 +116,14 @@ end end # Declare an exchange and return a high level Exchange object # @return [Exchange] + # @example + # amqp = AMQP::Client.new.start + # x = amqp.exchange("my.hash.exchange", "x-consistent-hash") + # x.publish("body", "routing-key") def exchange(name, type, durable: true, auto_delete: false, internal: false, arguments: {}) @exchanges.fetch(name) do with_connection do |conn| conn.channel(1).exchange_declare(name, type, durable: durable, auto_delete: auto_delete, internal: internal, arguments: arguments) @@ -170,12 +184,10 @@ @subscriptions.add? [queue, no_ack, prefetch, worker_threads, arguments, blk] with_connection do |conn| ch = conn.channel ch.basic_qos(prefetch) - ch.basic_consume(queue, no_ack: no_ack, worker_threads: worker_threads, arguments: arguments) do |msg| - blk.call(msg) - end + ch.basic_consume(queue, no_ack: no_ack, worker_threads: worker_threads, arguments: arguments, &blk) end end # Bind a queue to an exchange # @param queue [String] Name of the queue to bind