lib/bbk/amqp/utils.rb in bbk-amqp-1.0.0.105683 vs lib/bbk/amqp/utils.rb in bbk-amqp-1.0.0.105722

- old
+ new

@@ -12,20 +12,29 @@ # @param timeout [Integer] in seconds for waiting message message in queue # @raise [Timeout::Error] if queue empty in timeout time duration # @return [Array] array with delivery_info, metadata and payload def self.pop(queue, timeout = 10) unblocker = Queue.new + consumed = false + mx = Mutex.new consumer = queue.subscribe(block: false, manual_ack: true) do |delivery_info, metadata, payload| - message = [ - delivery_info, - metadata.to_hash.with_indifferent_access, - begin - Oj.load(payload).with_indifferent_access - rescue StandardError - payload + mx.synchronize do + if consumed + queue.channel.nack(delivery_info.delivery_tag, false, true) + break end - ] - unblocker << message + consumed = true + message = [ + delivery_info, + metadata.to_hash.with_indifferent_access, + begin + Oj.load(payload).with_indifferent_access + rescue StandardError + payload + end + ] + unblocker << message + end end Thread.new do sleep timeout unblocker << :timeout end