Sha256: 8e98cb97eac68c0e37e6f3244d37456be6594319b0b6e6a398cf26b8e5bfb7c5
Contents?: true
Size: 1.79 KB
Versions: 6
Compression:
Stored size: 1.79 KB
Contents
require 'thread' module Qsagi class ConfirmedQueue attr_reader :nacked_messages def initialize(queue) @queue = queue @nacked_messages = [] @unconfirmed_messages = {} @wait_for_confirms = false @semaphore = Mutex.new end def connect @queue.connect _confirm_select end def disconnect @queue.disconnect end def push(message) next_sequence_number = _channel.next_publish_seq_no @semaphore.synchronize do @unconfirmed_messages[next_sequence_number] = message end @queue.push(message) @wait_for_confirms = true end def pop(opts={}) @queue.pop(opts) end def wait_for_confirms _channel.wait_for_confirms if _wait_for_confirms? end def _channel @queue.channel end def _confirm_messages!(attributes) if attributes[:is_nack] if attributes[:multiple] @nacked_messages += @unconfirmed_messages.select { |k,v| k <= attributes[:delivery_tag] }.values @unconfirmed_messages.delete_if { |k,v| k <= attributes[:delivery_tag] } else @nacked_messages << @unconfirmed_messages.delete(attributes[:delivery_tag]) end else if attributes[:multiple] @unconfirmed_messages.delete_if { |k,v| k <= attributes[:delivery_tag] } else @unconfirmed_messages.delete(attributes[:delivery_tag]) end end end def _confirm_select callback = lambda do |delivery_tag, multiple, is_nack| @semaphore.synchronize do _confirm_messages!(:delivery_tag => delivery_tag, :multiple => multiple, :is_nack => is_nack) end end _channel.confirm_select(callback) end def _wait_for_confirms? @wait_for_confirms end end end
Version data entries
6 entries across 6 versions & 1 rubygems