Sha256: 02eb8e975548d06fc7eb82fde8297a18300e531c4d501d66139e6ffc6381d812

Contents?: true

Size: 1.58 KB

Versions: 2

Compression:

Stored size: 1.58 KB

Contents

module Qsagi
  class ConfirmedQueue
    attr_reader :nacked_messages

    def initialize(queue)
      @queue = queue
      @nacked_messages = []
      @unconfirmed_messages = {}
      @wait_for_confirms = false
    end

    def connect
      @queue.connect
      _confirm_select
    end

    def disconnect
      @queue.disconnect
    end

    def push(message)
      @unconfirmed_messages[_channel.next_publish_seq_no] = message
      @queue.push(message)
      @wait_for_confirms = true
    end

    def pop(opts={})
      @queue.pop(opts)
    end

    def wait_for_confirms
      _channel.wait_for_confirms if _wait_for_confirms?
    end

    def _channel
      @queue.channel
    end

    def _confirm_messages!(attributes)
      if attributes[:is_nack]
        if attributes[:multiple]
          @nacked_messages += @unconfirmed_messages.select { |k,v| k <= attributes[:delivery_tag] }.values
          @unconfirmed_messages.delete_if { |k,v| k <= attributes[:delivery_tag] }
        else
          @nacked_messages << @unconfirmed_messages.delete(attributes[:delivery_tag])
        end
      else
        if attributes[:multiple]
          @unconfirmed_messages.delete_if { |k,v| k <= attributes[:delivery_tag] }
        else
          @unconfirmed_messages.delete(attributes[:delivery_tag])
        end
      end
    end

    def _confirm_select
      _channel.confirm_select lambda { |delivery_tag, multiple, is_nack|
        _confirm_messages!(:delivery_tag => delivery_tag, :multiple => multiple, :is_nack => is_nack)
      }
    end

    def _wait_for_confirms?
      @wait_for_confirms
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
qsagi-0.1.1 lib/qsagi/confirmed_queue.rb
qsagi-0.1.0 lib/qsagi/confirmed_queue.rb