lib/amqp/client/channel.rb in amqp-client-1.1.4 vs lib/amqp/client/channel.rb in amqp-client-1.1.5
- old
+ new
@@ -73,10 +73,11 @@
@closed = [level, code, reason, classid, methodid]
@replies.close
@basic_gets.close
@unconfirmed_empty.close
@consumers.each_value(&:close)
+ @consumers.each_value(&:clear) # empty the queues too, messages can't be acked anymore
nil
end
# Handle returned messages in this block. If not set the message will just be logged to STDERR
# @yield [ReturnMessage] Messages returned by the broker when a publish has failed
@@ -198,15 +199,16 @@
end
# Purge a queue
# @param name [String] Name of the queue
# @param no_wait [Boolean] Don't wait for a broker confirmation if true
- # @return [nil]
+ # @return [Integer] Number of messages in queue when purged
+ # @return [nil] If no_wait was set true
def queue_purge(name, no_wait: false)
write_bytes FrameBytes.queue_purge(@id, name, no_wait)
- expect :queue_purge_ok unless no_wait
- nil
+ message_count, = expect :queue_purge_ok unless no_wait
+ message_count
end
# Unbind a queue from an exchange
# @param name [String] Name of the queue to unbind
# @param exchange [String] Name of the exchange to unbind from
@@ -308,26 +310,20 @@
# @param worker_threads [Integer] Number of threads processing messages,
# 0 means that the thread calling this method will process the messages and thus this method will block
# @yield [Message] Delivered message from the queue
# @return [Array<(String, Array<Thread>)>] Returns consumer_tag and an array of worker threads
# @return [nil] When `worker_threads` is 0 the method will return when the consumer is cancelled
- def basic_consume(queue, tag: "", no_ack: true, exclusive: false, arguments: {}, worker_threads: 1)
+ def basic_consume(queue, tag: "", no_ack: true, exclusive: false, arguments: {}, worker_threads: 1, &blk)
write_bytes FrameBytes.basic_consume(@id, queue, tag, no_ack, exclusive, arguments)
tag, = expect(:basic_consume_ok)
- q = @consumers[tag] = ::Queue.new
+ @consumers[tag] = q = ::Queue.new
if worker_threads.zero?
- loop do
- yield (q.pop || break)
- end
+ consume_loop(q, tag, &blk)
nil
else
threads = Array.new(worker_threads) do
- Thread.new do
- loop do
- yield (q.pop || break)
- end
- end
+ Thread.new { consume_loop(q, tag, &blk) }
end
[tag, threads]
end
end
@@ -544,9 +540,24 @@
frame_type, *args = @replies.pop
raise Error::Closed.new(@id, *@closed) if frame_type.nil?
raise Error::UnexpectedFrame.new(expected_frame_type, frame_type) unless frame_type == expected_frame_type
args
+ end
+
+ def consume_loop(queue, tag)
+ while (msg = queue.pop)
+ begin
+ yield msg
+ rescue StandardError # cancel the consumer if an uncaught exception is raised
+ begin
+ close("Unexpected exception in consumer #{tag} thread", 500)
+ rescue StandardError # ignore sockets errors while canceling
+ nil
+ end
+ raise # reraise original exception
+ end
+ end
end
end
end
end
end