lib/amqp/client.rb in amqp-client-1.1.4 vs lib/amqp/client.rb in amqp-client-1.1.5

- old
+ new

@@ -24,11 +24,10 @@ # @option options [Integer] channel_max (2048) Maxium number of channels the client will be allowed to have open. # Maxium allowed is 65_536. The smallest of the client's and the broker's value will be used. def initialize(uri = "", **options) @uri = uri @options = options - @queues = {} @exchanges = {} @subscriptions = Set.new @connq = SizedQueue.new(1) end @@ -170,18 +169,19 @@ # @!endgroup # @!group Queue actions # Consume messages from a queue # @param queue [String] Name of the queue to subscribe to - # @param no_ack [Boolean] When false messages have to be manually acknowledged (or rejected) - # @param prefetch [Integer] Specify how many messages to prefetch for consumers with no_ack is false - # @param worker_threads [Integer] Number of threads processing messages, - # 0 means that the thread calling this method will be blocked + # @param no_ack [Boolean] When false messages have to be manually acknowledged (or rejected) (default: false) + # @param prefetch [Integer] Specify how many messages to prefetch for consumers with no_ack is false (default: 1) + # @param worker_threads [Integer] Number of threads processing messages (default: 1) # @param arguments [Hash] Custom arguments to the consumer # @yield [Message] Delivered message from the queue # @return [Array<(String, Array<Thread>)>] Returns consumer_tag and an array of worker threads - # @return [nil] When `worker_threads` is 0 the method will return when the consumer is cancelled + # @return [nil] def subscribe(queue, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}, &blk) + raise ArgumentError, "worker_threads have to be > 0" if worker_threads <= 0 + @subscriptions.add? [queue, no_ack, prefetch, worker_threads, arguments, blk] with_connection do |conn| ch = conn.channel ch.basic_qos(prefetch)