Sha256: ef218aa7ba330e3b641828c573ca40fe814a54928fb300c4c7e051527ce3d6d6

Contents?: true

Size: 1.84 KB

Versions: 3

Compression:

Stored size: 1.84 KB

Contents

module Qsagi
  class StandardQueue
    attr_reader :channel, :options

    def initialize(options={})
      @options = options
    end

    def ack(message)
      @channel.ack(message.delivery_tag, false)
    end

    def reject(message, options={})
      @channel.reject(message.delivery_tag, options.fetch(:requeue, true))
    end

    def clear
      @queue.purge
    end

    def connect
      @client = Bunny.new(
        :host => options[:host],
        :port => options[:port],
        :heartbeat => options[:heartbeat],
        :continuation_timeout => options[:continuation_timeout],
        :username => options[:username],
        :password => options[:password],
      )
      @client.start
      @channel = @client.create_channel
      @exchange = @channel.exchange(options[:exchange], options[:exchange_options])
      @queue = @channel.queue(options[:queue_name], :durable => options[:durable], :arguments => options[:queue_arguments])
      @queue.bind(@exchange, :routing_key => options[:queue_name]) unless options[:exchange].empty?
    end

    def disconnect
      @client.close unless @client.nil?
    end

    def length
      @queue.status[:message_count]
    end

    def pop(options = {})
      auto_ack = options.fetch(:auto_ack, true)
      delivery_info, properties, message = @queue.pop(:ack => !auto_ack)

      unless message.nil?
        _message_class.new(delivery_info, _serializer.deserialize(message))
      end
    end

    def push(message)
      serialized_message = options[:serializer].serialize(message)
      @exchange.publish(serialized_message, :routing_key => @queue.name, :persistent => options[:persistent], :mandatory => options[:mandatory])
    end

    def reconnect
      disconnect
      connect
    end

    def _message_class
      options[:message_class]
    end

    def _serializer
      options[:serializer]
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
qsagi-0.1.3 lib/qsagi/standard_queue.rb
qsagi-0.1.2 lib/qsagi/standard_queue.rb
qsagi-0.1.1 lib/qsagi/standard_queue.rb