Sha256: 7c0c5467b8a0c9d98f0c115f5673a9428844ee6f610a297e1a621e2cd710469a

Contents?: true

Size: 1.81 KB

Versions: 2

Compression:

Stored size: 1.81 KB

Contents

module Qsagi
  module Queue
    def self.included(klass)
      klass.extend ClassMethods
    end

    module ClassMethods
      def connect(opts={}, &block)
        options = default_options.merge(opts)
        queue = _queue(options)

        begin
          queue.connect

          block.call(queue)
        ensure
          queue.disconnect
        end
      end

      def _queue(options)
        standard_queue = StandardQueue.new(options)
        if options[:queue_type] == :confirmed
          ConfirmedQueue.new(standard_queue)
        else
          standard_queue
        end
      end

      def default_options
        {
          :host => host,
          :port => port,
          :queue_type => :standard,
          :heartbeat => heartbeat,
          :message_class => _message_class,
          :queue_name => queue_name,
          :durable => true,
          :queue_arguments => {"x-ha-policy" => "all"},
          :persistent => true,
          :mandatory => true,
          :serializer => _serializer,
          :exchange_options => _exchange_options,
          :exchange => _exchange,
          :connect_timeout => 5,
          :read_timeout => 5,
          :write_timeout => 5,
          :logger => nil
        }
      end

      def exchange(exchange, options = {})
        @exchange = exchange
        @exchange_options = {:type => :direct}.merge(options)
      end

      def message_class(message_class)
        @message_class = message_class
      end

      def serializer(serializer)
        @serializer = serializer
      end

      def _exchange
        @exchange || ""
      end

      def _exchange_options
        @exchange_options || {}
      end

      def _message_class
        @message_class || Qsagi::Message
      end

      def _serializer
        @serializer || Qsagi::DefaultSerializer
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
qsagi-0.2.3 lib/qsagi/queue.rb
qsagi-0.2.2 lib/qsagi/queue.rb