lib/bbk/amqp/utils.rb in bbk-amqp-1.0.0.99701 vs lib/bbk/amqp/utils.rb in bbk-amqp-1.0.0.105635
- old
+ new
@@ -12,11 +12,11 @@
# @param timeout [Integer] in seconds for waiting message message in queue
# @raise [Timeout::Error] if queue empty in timeout time duration
# @return [Array] array with delivery_info, metadata and payload
def self.pop(queue, timeout = 10)
unblocker = Queue.new
- consumer = queue.subscribe(block: false) do |delivery_info, metadata, payload|
+ consumer = queue.subscribe(block: false, manual_ack: true) do |delivery_info, metadata, payload|
message = [
delivery_info,
metadata.to_hash.with_indifferent_access,
begin
Oj.load(payload).with_indifferent_access
@@ -31,10 +31,10 @@
unblocker << :timeout
end
result = unblocker.pop
consumer.cancel
raise ::Timeout::Error if result == :timeout
-
+ queue.channel.ack(result[0].delivery_tag)
result
end
# Extract CN certificate attribute from certificate path
# @param cert_path [String] path to certificate file