lib/qsagi/queue.rb in qsagi-0.0.3 vs lib/qsagi/queue.rb in qsagi-0.1.0
- old
+ new
@@ -1,72 +1,55 @@
module Qsagi
module Queue
- def ack(message)
- @queue.ack(:delivery_tag => message.delivery_tag)
- end
-
- def clear
- loop do
- message = @queue.pop
- break if message[:payload] == :queue_empty
- end
- end
-
- def connect
- @client = Bunny.new(:host => self.class.host, :port => self.class.port, :heartbeat => self.class.heartbeat)
- @client.start
- @queue = @client.queue(self.class.queue_name, :durable => true, :arguments => {"x-ha-policy" => "all"})
- @exchange = @client.exchange(self.class._exchange)
- @queue.bind(@exchange, :key => self.class.queue_name) unless self.class._exchange.empty?
- end
-
- def disconnect
- @client.send(:close_socket) unless @client.nil?
- end
-
- def length
- @queue.status[:message_count]
- end
-
- def pop(options = {})
- auto_ack = options.fetch(:auto_ack, true)
- message = @queue.pop(:ack => !auto_ack)
-
- unless message[:payload] == :queue_empty
- self.class._message_class.new(message, self.class._serializer.deserialize(message[:payload]))
- end
- end
-
- def push(message)
- serialized_message = self.class._serializer.serialize(message)
- @exchange.publish(serialized_message, :key => @queue.name, :persistent => true, :mandatory => true)
- end
-
- def reconnect
- disconnect
- connect
- end
-
def self.included(klass)
klass.extend ClassMethods
end
module ClassMethods
- def connect(&block)
- queue = new
+ def connect(opts={}, &block)
+ options = default_options.merge(opts)
+ queue = _queue(options)
begin
queue.connect
block.call(queue)
ensure
queue.disconnect
end
end
- def exchange(exchange)
+ def _queue(options)
+ standard_queue = StandardQueue.new(options)
+ if options[:queue_type] == :confirmed
+ ConfirmedQueue.new(standard_queue)
+ else
+ standard_queue
+ end
+ end
+
+ def default_options
+ {
+ :host => host,
+ :port => port,
+ :queue_type => :standard,
+ :heartbeat => heartbeat,
+ :message_class => _message_class,
+ :queue_name => queue_name,
+ :durable => true,
+ :queue_arguments => {"x-ha-policy" => "all"},
+ :persistent => true,
+ :mandatory => true,
+ :serializer => _serializer,
+ :exchange_options => _exchange_options,
+ :exchange => _exchange
+ }
+ end
+
+ def exchange(exchange, options = {})
@exchange = exchange
+ @exchange_options = {:type => :direct}.merge(options)
end
def message_class(message_class)
@message_class = message_class
end
@@ -75,9 +58,13 @@
@serializer = serializer
end
def _exchange
@exchange || ""
+ end
+
+ def _exchange_options
+ @exchange_options || {}
end
def _message_class
@message_class || Qsagi::Message
end