Sha256: 8e98cb97eac68c0e37e6f3244d37456be6594319b0b6e6a398cf26b8e5bfb7c5

Contents?: true

Size: 1.79 KB

Versions: 6

Compression:

Stored size: 1.79 KB

Contents

require 'thread'

module Qsagi
  class ConfirmedQueue
    attr_reader :nacked_messages

    def initialize(queue)
      @queue = queue
      @nacked_messages = []
      @unconfirmed_messages = {}
      @wait_for_confirms = false
      @semaphore = Mutex.new
    end

    def connect
      @queue.connect
      _confirm_select
    end

    def disconnect
      @queue.disconnect
    end

    def push(message)
      next_sequence_number = _channel.next_publish_seq_no
      @semaphore.synchronize do
        @unconfirmed_messages[next_sequence_number] = message
      end
      @queue.push(message)
      @wait_for_confirms = true
    end

    def pop(opts={})
      @queue.pop(opts)
    end

    def wait_for_confirms
      _channel.wait_for_confirms if _wait_for_confirms?
    end

    def _channel
      @queue.channel
    end

    def _confirm_messages!(attributes)
      if attributes[:is_nack]
        if attributes[:multiple]
          @nacked_messages += @unconfirmed_messages.select { |k,v| k <= attributes[:delivery_tag] }.values
          @unconfirmed_messages.delete_if { |k,v| k <= attributes[:delivery_tag] }
        else
          @nacked_messages << @unconfirmed_messages.delete(attributes[:delivery_tag])
        end
      else
        if attributes[:multiple]
          @unconfirmed_messages.delete_if { |k,v| k <= attributes[:delivery_tag] }
        else
          @unconfirmed_messages.delete(attributes[:delivery_tag])
        end
      end
    end

    def _confirm_select
      callback = lambda do |delivery_tag, multiple, is_nack|
        @semaphore.synchronize do
          _confirm_messages!(:delivery_tag => delivery_tag, :multiple => multiple, :is_nack => is_nack)
        end
      end

      _channel.confirm_select(callback)
    end

    def _wait_for_confirms?
      @wait_for_confirms
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
qsagi-0.2.3 lib/qsagi/confirmed_queue.rb
qsagi-0.2.2 lib/qsagi/confirmed_queue.rb
qsagi-0.2.1 lib/qsagi/confirmed_queue.rb
qsagi-0.2.0 lib/qsagi/confirmed_queue.rb
qsagi-0.1.3 lib/qsagi/confirmed_queue.rb
qsagi-0.1.2 lib/qsagi/confirmed_queue.rb