lib/amqp/queue.rb in carrot-0.7.0 vs lib/amqp/queue.rb in carrot-0.8.0
- old
+ new
@@ -1,12 +1,11 @@
module Carrot::AMQP
class Queue
- attr_reader :name, :server, :carrot
+ attr_reader :name, :carrot
attr_accessor :delivery_tag
def initialize(carrot, name, opts = {})
- @server = carrot.server
@opts = opts
@name = name
@carrot = carrot
server.send_frame(
Protocol::Queue::Declare.new({ :queue => name, :nowait => true }.merge(opts))
@@ -17,11 +16,11 @@
self.delivery_tag = nil
server.send_frame(
Protocol::Basic::Get.new({ :queue => name, :consumer_tag => name, :no_ack => !opts.delete(:ack), :nowait => true }.merge(opts))
)
method = server.next_method
- return unless method.is_a?(Protocol::Basic::GetOk)
+ return unless method.kind_of?(Protocol::Basic::GetOk)
self.delivery_tag = method.delivery_tag
header = server.next_payload
@@ -54,10 +53,12 @@
def status(opts = {}, &blk)
server.send_frame(
Protocol::Queue::Declare.new({ :queue => name, :passive => true }.merge(opts))
)
method = server.next_method
+ return [nil, nil] if method.kind_of?(Protocol::Connection::Close)
+
[method.message_count, method.consumer_count]
end
def bind(exchange, opts = {})
exchange = exchange.respond_to?(:name) ? exchange.name : exchange
@@ -81,9 +82,19 @@
def delete(opts = {})
server.send_frame(
Protocol::Queue::Delete.new({ :queue => name, :nowait => true }.merge(opts))
)
carrot.queues.delete(name)
+ end
+
+ def purge(opts = {})
+ server.send_frame(
+ Protocol::Queue::Purge.new({ :queue => name, :nowait => true }.merge(opts))
+ )
+ end
+
+ def server
+ carrot.server
end
private
def exchange
@exchange ||= Exchange.new(carrot, :direct, '', :key => name)