lib/bbk/amqp/utils.rb in bbk-amqp-1.0.0.105683 vs lib/bbk/amqp/utils.rb in bbk-amqp-1.0.0.105722
- old
+ new
@@ -12,20 +12,29 @@
# @param timeout [Integer] in seconds for waiting message message in queue
# @raise [Timeout::Error] if queue empty in timeout time duration
# @return [Array] array with delivery_info, metadata and payload
def self.pop(queue, timeout = 10)
unblocker = Queue.new
+ consumed = false
+ mx = Mutex.new
consumer = queue.subscribe(block: false, manual_ack: true) do |delivery_info, metadata, payload|
- message = [
- delivery_info,
- metadata.to_hash.with_indifferent_access,
- begin
- Oj.load(payload).with_indifferent_access
- rescue StandardError
- payload
+ mx.synchronize do
+ if consumed
+ queue.channel.nack(delivery_info.delivery_tag, false, true)
+ break
end
- ]
- unblocker << message
+ consumed = true
+ message = [
+ delivery_info,
+ metadata.to_hash.with_indifferent_access,
+ begin
+ Oj.load(payload).with_indifferent_access
+ rescue StandardError
+ payload
+ end
+ ]
+ unblocker << message
+ end
end
Thread.new do
sleep timeout
unblocker << :timeout
end