lib/amqp/client.rb in amqp-client-1.1.4 vs lib/amqp/client.rb in amqp-client-1.1.5
- old
+ new
@@ -24,11 +24,10 @@
# @option options [Integer] channel_max (2048) Maxium number of channels the client will be allowed to have open.
# Maxium allowed is 65_536. The smallest of the client's and the broker's value will be used.
def initialize(uri = "", **options)
@uri = uri
@options = options
-
@queues = {}
@exchanges = {}
@subscriptions = Set.new
@connq = SizedQueue.new(1)
end
@@ -170,18 +169,19 @@
# @!endgroup
# @!group Queue actions
# Consume messages from a queue
# @param queue [String] Name of the queue to subscribe to
- # @param no_ack [Boolean] When false messages have to be manually acknowledged (or rejected)
- # @param prefetch [Integer] Specify how many messages to prefetch for consumers with no_ack is false
- # @param worker_threads [Integer] Number of threads processing messages,
- # 0 means that the thread calling this method will be blocked
+ # @param no_ack [Boolean] When false messages have to be manually acknowledged (or rejected) (default: false)
+ # @param prefetch [Integer] Specify how many messages to prefetch for consumers with no_ack is false (default: 1)
+ # @param worker_threads [Integer] Number of threads processing messages (default: 1)
# @param arguments [Hash] Custom arguments to the consumer
# @yield [Message] Delivered message from the queue
# @return [Array<(String, Array<Thread>)>] Returns consumer_tag and an array of worker threads
- # @return [nil] When `worker_threads` is 0 the method will return when the consumer is cancelled
+ # @return [nil]
def subscribe(queue, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}, &blk)
+ raise ArgumentError, "worker_threads have to be > 0" if worker_threads <= 0
+
@subscriptions.add? [queue, no_ack, prefetch, worker_threads, arguments, blk]
with_connection do |conn|
ch = conn.channel
ch.basic_qos(prefetch)