lib/amqp/queue.rb in carrot-0.7.0 vs lib/amqp/queue.rb in carrot-0.8.0

- old
+ new

@@ -1,12 +1,11 @@ module Carrot::AMQP class Queue - attr_reader :name, :server, :carrot + attr_reader :name, :carrot attr_accessor :delivery_tag def initialize(carrot, name, opts = {}) - @server = carrot.server @opts = opts @name = name @carrot = carrot server.send_frame( Protocol::Queue::Declare.new({ :queue => name, :nowait => true }.merge(opts)) @@ -17,11 +16,11 @@ self.delivery_tag = nil server.send_frame( Protocol::Basic::Get.new({ :queue => name, :consumer_tag => name, :no_ack => !opts.delete(:ack), :nowait => true }.merge(opts)) ) method = server.next_method - return unless method.is_a?(Protocol::Basic::GetOk) + return unless method.kind_of?(Protocol::Basic::GetOk) self.delivery_tag = method.delivery_tag header = server.next_payload @@ -54,10 +53,12 @@ def status(opts = {}, &blk) server.send_frame( Protocol::Queue::Declare.new({ :queue => name, :passive => true }.merge(opts)) ) method = server.next_method + return [nil, nil] if method.kind_of?(Protocol::Connection::Close) + [method.message_count, method.consumer_count] end def bind(exchange, opts = {}) exchange = exchange.respond_to?(:name) ? exchange.name : exchange @@ -81,9 +82,19 @@ def delete(opts = {}) server.send_frame( Protocol::Queue::Delete.new({ :queue => name, :nowait => true }.merge(opts)) ) carrot.queues.delete(name) + end + + def purge(opts = {}) + server.send_frame( + Protocol::Queue::Purge.new({ :queue => name, :nowait => true }.merge(opts)) + ) + end + + def server + carrot.server end private def exchange @exchange ||= Exchange.new(carrot, :direct, '', :key => name)