lib/amqp/client/channel.rb in amqp-client-1.1.4 vs lib/amqp/client/channel.rb in amqp-client-1.1.5

- old
+ new

@@ -73,10 +73,11 @@ @closed = [level, code, reason, classid, methodid] @replies.close @basic_gets.close @unconfirmed_empty.close @consumers.each_value(&:close) + @consumers.each_value(&:clear) # empty the queues too, messages can't be acked anymore nil end # Handle returned messages in this block. If not set the message will just be logged to STDERR # @yield [ReturnMessage] Messages returned by the broker when a publish has failed @@ -198,15 +199,16 @@ end # Purge a queue # @param name [String] Name of the queue # @param no_wait [Boolean] Don't wait for a broker confirmation if true - # @return [nil] + # @return [Integer] Number of messages in queue when purged + # @return [nil] If no_wait was set true def queue_purge(name, no_wait: false) write_bytes FrameBytes.queue_purge(@id, name, no_wait) - expect :queue_purge_ok unless no_wait - nil + message_count, = expect :queue_purge_ok unless no_wait + message_count end # Unbind a queue from an exchange # @param name [String] Name of the queue to unbind # @param exchange [String] Name of the exchange to unbind from @@ -308,26 +310,20 @@ # @param worker_threads [Integer] Number of threads processing messages, # 0 means that the thread calling this method will process the messages and thus this method will block # @yield [Message] Delivered message from the queue # @return [Array<(String, Array<Thread>)>] Returns consumer_tag and an array of worker threads # @return [nil] When `worker_threads` is 0 the method will return when the consumer is cancelled - def basic_consume(queue, tag: "", no_ack: true, exclusive: false, arguments: {}, worker_threads: 1) + def basic_consume(queue, tag: "", no_ack: true, exclusive: false, arguments: {}, worker_threads: 1, &blk) write_bytes FrameBytes.basic_consume(@id, queue, tag, no_ack, exclusive, arguments) tag, = expect(:basic_consume_ok) - q = @consumers[tag] = ::Queue.new + @consumers[tag] = q = ::Queue.new if worker_threads.zero? - loop do - yield (q.pop || break) - end + consume_loop(q, tag, &blk) nil else threads = Array.new(worker_threads) do - Thread.new do - loop do - yield (q.pop || break) - end - end + Thread.new { consume_loop(q, tag, &blk) } end [tag, threads] end end @@ -544,9 +540,24 @@ frame_type, *args = @replies.pop raise Error::Closed.new(@id, *@closed) if frame_type.nil? raise Error::UnexpectedFrame.new(expected_frame_type, frame_type) unless frame_type == expected_frame_type args + end + + def consume_loop(queue, tag) + while (msg = queue.pop) + begin + yield msg + rescue StandardError # cancel the consumer if an uncaught exception is raised + begin + close("Unexpected exception in consumer #{tag} thread", 500) + rescue StandardError # ignore sockets errors while canceling + nil + end + raise # reraise original exception + end + end end end end end end