lib/qsagi/queue.rb in qsagi-0.0.3 vs lib/qsagi/queue.rb in qsagi-0.1.0

- old
+ new

@@ -1,72 +1,55 @@ module Qsagi module Queue - def ack(message) - @queue.ack(:delivery_tag => message.delivery_tag) - end - - def clear - loop do - message = @queue.pop - break if message[:payload] == :queue_empty - end - end - - def connect - @client = Bunny.new(:host => self.class.host, :port => self.class.port, :heartbeat => self.class.heartbeat) - @client.start - @queue = @client.queue(self.class.queue_name, :durable => true, :arguments => {"x-ha-policy" => "all"}) - @exchange = @client.exchange(self.class._exchange) - @queue.bind(@exchange, :key => self.class.queue_name) unless self.class._exchange.empty? - end - - def disconnect - @client.send(:close_socket) unless @client.nil? - end - - def length - @queue.status[:message_count] - end - - def pop(options = {}) - auto_ack = options.fetch(:auto_ack, true) - message = @queue.pop(:ack => !auto_ack) - - unless message[:payload] == :queue_empty - self.class._message_class.new(message, self.class._serializer.deserialize(message[:payload])) - end - end - - def push(message) - serialized_message = self.class._serializer.serialize(message) - @exchange.publish(serialized_message, :key => @queue.name, :persistent => true, :mandatory => true) - end - - def reconnect - disconnect - connect - end - def self.included(klass) klass.extend ClassMethods end module ClassMethods - def connect(&block) - queue = new + def connect(opts={}, &block) + options = default_options.merge(opts) + queue = _queue(options) begin queue.connect block.call(queue) ensure queue.disconnect end end - def exchange(exchange) + def _queue(options) + standard_queue = StandardQueue.new(options) + if options[:queue_type] == :confirmed + ConfirmedQueue.new(standard_queue) + else + standard_queue + end + end + + def default_options + { + :host => host, + :port => port, + :queue_type => :standard, + :heartbeat => heartbeat, + :message_class => _message_class, + :queue_name => queue_name, + :durable => true, + :queue_arguments => {"x-ha-policy" => "all"}, + :persistent => true, + :mandatory => true, + :serializer => _serializer, + :exchange_options => _exchange_options, + :exchange => _exchange + } + end + + def exchange(exchange, options = {}) @exchange = exchange + @exchange_options = {:type => :direct}.merge(options) end def message_class(message_class) @message_class = message_class end @@ -75,9 +58,13 @@ @serializer = serializer end def _exchange @exchange || "" + end + + def _exchange_options + @exchange_options || {} end def _message_class @message_class || Qsagi::Message end